CUBE SUGAR CONTAINER

技術系のこと書きます。

Python: PyTorch のオプティマイザを自作する

今回は、PyTorch でオプティマイザを自作する方法について紹介してみる。

きっかけは、勉強がてら主要なオプティマイザを自作してみようと思い至ったことだった。 その過程で、PyTorch でオプティマイザを自作する場合の流儀が把握できた。

そこで、この記事では以下のオプティマイザを書きながらその方法を説明してみる。

  • 単純な SGD (Stochastic Gradient Descent)
  • Momentum を導入した SGD

上記は最も古典的なオプティマイザだけど、実装することで基本的な機能を一通り紹介できるため。

使った環境は次のとおり。

$ sw_vers
ProductName:        macOS
ProductVersion:     14.6.1
BuildVersion:       23G93
$ python -V        
Python 3.11.9
$ pip list | grep -i torch
torch             2.4.0

もくじ

下準備

まずは PyTorch をインストールしておく。

$ pip install torch matplotlib

題材とする問題について

まず、オプティマイザを扱う以上は、最適化したい何らかの問題が必要になる。 今回の記事で題材とするのは f(x, y) = ax^2 + by^2 という関数にする。 関数には定数 a, b と変数 x, y が含まれる。 そして、これらの定数と変数を適当な値で初期化した上で、結果がゼロに近づくように最適化する。

上記の問題を、まずは PyTorch に組み込みで用意された SGD の実装を使って最適化してみよう。 要するに、まずはお手本となる結果を確認する。

サンプルコードは以下のとおり。 ExampleFunction というクラスが最適化したい関数を表している。 このクラスは nn.Module を継承しており、forward() メソッドで f(x, y) = ax^2 + by^2 に相当する順伝播を実装している。 コードでは、学習率 0.95 の SGD を使うことで、この関数の出力をゼロに近づけるようにパラメータを更新する。 また、更新のイテレーション回数は 30 回に決め打ちしている。 ちなみに、この問題設定や初期値などは「ゼロから作るDeep Learning 1」に記載されている内容と同一にしている。

import torch
from matplotlib import pyplot as plt
from torch import nn
from torch import optim


class ExampleFunction(nn.Module):
    """最適化したい関数: f(x, y) = ax^2 + by^2"""

    def __init__(self, a, x, b, y):
        super(ExampleFunction, self).__init__()
        self.a = a
        self.x = nn.Parameter(torch.tensor([x], dtype=torch.float32))
        self.b = b
        self.y = nn.Parameter(torch.tensor([y], dtype=torch.float32))

    def forward(self):
        # f(x, y) = ax^2 + by^2
        return self.a * self.x**2 + self.b * self.y**2


def main():
    # 初期値を指定して最適化したいモデルをインスタンス化する
    model = ExampleFunction(a=1 / 20, x=-7.0, b=1.0, y=2.0)
    # SGD で最適化する
    optimizer = optim.SGD(model.parameters(), lr=0.95)

    # パラメータの軌跡を残す
    trajectory_x = [model.x.detach().numpy()[0]]
    trajectory_y = [model.y.detach().numpy()[0]]

    # 最適化のループを 30 回にわたって回す
    num_epochs = 30
    for epoch in range(1, num_epochs + 1):
        # 勾配を初期化する
        optimizer.zero_grad()

        # モデルの出力を得る
        outputs = model()

        # 誤差逆伝搬
        # 本来は損失関数を元に勾配を求める
        # 今回はパラメータがゼロへ近づくように最適化する
        outputs.backward()

        # パラメータを更新する
        optimizer.step()

        # 更新されたパラメータを記録する
        x = model.x.detach().numpy()[0]
        trajectory_x.append(x)
        y = model.y.detach().numpy()[0]
        trajectory_y.append(y)

    # パラメータのたどった軌跡を可視化する
    fig, ax = plt.subplots(1, 1, figsize=(8, 6))
    ax.plot(trajectory_x, trajectory_y, marker="o", markersize=5, label="Trajectory")
    ax.legend()
    ax.grid(True)
    ax.set_xlabel("x")
    ax.set_ylabel("y")
    plt.show()


if __name__ == "__main__":
    main()

上記に適当な名前をつけて実行してみよう。

$ python sgd.py

すると、次のようなグラフが得られる。 これは最適化の過程でパラメータの xy が更新されていく軌跡を表している。

SGD で最適化したパラメータの軌跡

上記から、それぞれのパラメータが 0 に近づいていく様子が確認できる。

SGD のオプティマイザを自作する

続いては、今回の本題となるオプティマイザの自作に入る。 初めに目指すところは、PyTorch 組み込みの SGD と全く同じ結果が得られるオプティマイザを作ること。

早速だけどサンプルコードを以下に示す。 このコードでは CustomSGD という名前でオプティマイザを実装している。 以降は、この CustomSGD について順を追って説明していく。

from collections.abc import Iterable

import torch
from matplotlib import pyplot as plt
from torch import nn
from torch import optim
from torch.optim import Optimizer


class ExampleFunction(nn.Module):

    def __init__(self, a, x, b, y):
        super(ExampleFunction, self).__init__()
        self.a = a
        self.x = nn.Parameter(torch.tensor([x], dtype=torch.float32))
        self.b = b
        self.y = nn.Parameter(torch.tensor([y], dtype=torch.float32))

    def forward(self):
        return self.a * self.x**2 + self.b * self.y**2


class CustomSGD(Optimizer):
    """自作した SGD のオプティマイザ

    PyTorch でオプティマイザを自作する場合は torch.optim.Optimizer を継承する
    """

    def __init__(self, params: Iterable, lr: float = 1e-3):
        # 最適化したいパラメータと、動作に必要なハイパーパラメータをスーパークラスの __init__() に渡す
        defaults = dict(
            lr=lr,
        )
        super(CustomSGD, self).__init__(params, defaults)

    def step(self, closure: None = None) -> None:
        """SGD の更新式を実装した step() メソッド

        (SGD の更新式)
        theta_{t+1} = theta_t - eta_t * grad(L(theta_t))

        theta: パラメータ (重み)
        eta: 学習率
        grad(L(theta)): 損失関数の勾配
        """
        # 複数のパラメータが辞書形式で渡された際には param_groups に分割して入る
        for group in self.param_groups:
            # グループには最適化したいパラメータや、動作に必要な設定が辞書形式で入っている
            for param in group["params"]:
                # 各パラメータごとに処理していく
                if param.grad is None:
                    # パラメータの勾配が計算されていないものは更新しない
                    continue
                # group に格納された学習率 (lr) と勾配を使ってパラメータの値を更新する
                param.data -= group["lr"] * param.grad


def main():
    model = ExampleFunction(a=1 / 20, x=-7.0, b=1.0, y=2.0)
    # 自作した SGD で最適化する
    optimizer = CustomSGD(model.parameters(), lr=0.95)

    trajectory_x = [model.x.detach().numpy()[0]]
    trajectory_y = [model.y.detach().numpy()[0]]

    num_epochs = 30
    for epoch in range(1, num_epochs + 1):
        optimizer.zero_grad()

        outputs = model()

        outputs.backward()

        optimizer.step()

        x = model.x.detach().numpy()[0]
        trajectory_x.append(x)
        y = model.y.detach().numpy()[0]
        trajectory_y.append(y)

    fig, ax = plt.subplots(1, 1, figsize=(8, 6))
    ax.plot(trajectory_x, trajectory_y, marker="o", markersize=5, label="Trajectory")
    ax.legend()
    ax.grid(True)
    ax.set_xlabel("x")
    ax.set_ylabel("y")
    plt.show()


if __name__ == "__main__":
    main()

まず、PyTorch でオプティマイザを作る場合には基本的に torch.optim.Optimizer を継承したクラスを作る。 その上で、実装する必要があるメソッドは __init__()step() の 2 つある。

__init__() メソッドについて

__init__() では、オプティマイザを初期化する。 このとき、最適化したいモデルのパラメータ (重み) と動作に必要なハイパーパラメータを辞書の形式で引数としてスーパークラスの __init__() を呼び出す。 詳しくは後述するものの、こうすることで torch.optim.Optimizer で実装されているインスタンス変数などがセットアップされて利用できるようになる。

step() メソッドについて

step() メソッドは、最適化する対象のパラメータの勾配を計算した上でユーザのコードから呼び出される。 こちらに、具体的なパラメータを更新する処理を記述する。

ちなみに、ドキュメントやソースコードを確認すると、定義する上でメソッドのシグネチャは以下の 3 通りから選べるようになっている。

def step(self, closure: None = ...) -> None:
def step(self, closure: Callable[[], float]) -> float:
def step(self, closure: Optional[Callable[[], float]] = None) -> Optional[float]:

異なるシグネチャが存在する理由は、アルゴリズムによって引数の closure を利用するかが選べるため。 一番上のシグネチャでは closure をまったく使用しないパターン、真ん中が必ず使用するパターン、一番下があってもなくてもどちらも許容するパターンになっている。

この closure という引数には、Callable[[], float] というタイプヒントから分かるように引数なしの呼び出し可能オブジェクトが渡される。 これは最適化するモデルの損失を float 型で返すもので、オプティマイザ内で損失を評価しながら何度もパラメータを更新する場合に使用するらしい。 ただし、実際に closure を有効に使用しているアルゴリズムはごく限られている (LBFGS など) ことから、通常は一番上か一番下を選択すれば良い。 PyTorch が組み込みで実装しているオプティマイザの多くは一番下のシグネチャを選択しているようだ 2。 今回のサンプルコードではシンプルさを優先して一番上を採用した。

続いては step() メソッドの具体的な実装方法について解説していく。 前述したスーパークラスの __init__() に渡された引数は、グループ単位で Optimizer#param_groups というメンバ変数に登録される。 ここでいうグループというのは、一つの Optimizer で異なる複数の最適化を同時に実行する場合に用いられる処理のまとまりのこと。 以下のコードでは、グループをループで取り出しながら処理している。 ちなみに、通常であればここには一つの要素しか入らない。

       # 複数のパラメータが辞書形式で渡された際には param_groups に分割して入る
        for group in self.param_groups:

グループを取り出したら、そこに辞書形式でパラメータや動作に必要な設定が入っている。 たとえば最適化の対象になるパラメータは "params" というキーで得られる。 以下のコードでは各パラメータを取り出して for ループでそれぞれ処理している。 ここでいうパラメータというのは、今回のタスクであれば xy に当たる。

            # グループには最適化したいパラメータや、動作に必要な設定が辞書形式で入っている
            for param in group["params"]:

パラメータによっては勾配が計算されていないことが想定される。 その場合には値を更新する必要がないというかできないので処理をスキップする。

                if param.grad is None:
                    # パラメータの勾配が計算されていないものは更新しない
                    continue

そして、肝心の SGD の更新式を実装している部分に入る。 まず、SGD の更新式は以下のとおり。

 \displaystyle
\theta_{t+1} = \theta_t - \eta \nabla_{\theta} L(\theta_t)

上記の数式と、プログラムの変数の対応を以下に示す。 学習率はスーパークラスの __init__()defaults を通して渡したことでグループに登録されている。

  •  \theta
    • param.data
  •  \eta
    • group["lr"]
  •  \nabla_{\theta} L(\theta)
    • param.grad

上記より、パラメータの更新は次のようなコードになる。

                # group に格納された学習率 (lr) と勾配を使ってパラメータの値を更新する
                param.data -= group["lr"] * param.grad

サンプルコードを実行する

一通り説明できたので、サンプルコードに適当な名前をつけて実行する。

$ python customsgd.py

すると、以下のようなグラフが得られる。

自作した SGD で最適化したパラメータの軌跡

上記から、先ほど実行した PyTorch 組み込みの SGD と全く同じ軌跡を辿っていることが確認できる。

複数のモデルを登録してみる

先ほどの例ではグループが一つしかない場合だった。 続いては、一つのオプティマイザに複数のモデルを登録する場合も試してみよう。

サンプルコードが以下になる。 このコードでは model1model2 という 2 つの最適化すべきモデルを一つのオプティマイザに登録している。

from collections.abc import Iterable
from typing import Any

import torch
from matplotlib import pyplot as plt
from torch import nn
from torch.optim import Optimizer


class ExampleFunction(nn.Module):

    def __init__(self, a, x, b, y):
        super(ExampleFunction, self).__init__()
        self.a = a
        self.x = nn.Parameter(torch.tensor([x]))
        self.b = b
        self.y = nn.Parameter(torch.tensor([y]))

    def forward(self):
        return self.a * self.x**2 + self.b * self.y**2


class CustomSGD(Optimizer):

    def __init__(self, params: Iterable, lr: float = 1e-3):
        defaults: dict[str, Any] = dict(
            lr=lr,
        )
        super(CustomSGD, self).__init__(params, defaults)

    def step(self, closure: None = None) -> None:
        for group in self.param_groups:
            for param in group["params"]:
                if param.grad is None:
                    continue
                param.data -= group["lr"] * param.grad


def main():
    # 最適化したい複数のモデル
    model1 = ExampleFunction(a=1 / 20, x=-7.0, b=1.0, y=2.0)
    model2 = ExampleFunction(a=4, x=2.0, b=3, y=2.0)

    # 複数のモデルを一つのオプティマイザで最適化したい場合は、それぞれを辞書として渡す
    # 結果は torch.optim.Optimizer#param_groups の各要素として入る
    optimizer = CustomSGD(
        [
            {
                "params": model1.parameters(),
                "lr": 0.95,
            },
            {
                "params": model2.parameters(),
                "lr": 0.05,
            },
        ]
    )

    # それぞれの軌跡を残す
    trajectory_x1 = [model1.x.detach().numpy()[0]]
    trajectory_y1 = [model1.y.detach().numpy()[0]]
    trajectory_x2 = [model2.x.detach().numpy()[0]]
    trajectory_y2 = [model2.y.detach().numpy()[0]]

    num_epochs = 30
    for epoch in range(1, num_epochs + 1):
        optimizer.zero_grad()

        outputs1 = model1()
        outputs1.backward()

        outputs2 = model2()
        outputs2.backward()

        optimizer.step()

        x1 = model1.x.detach().numpy()[0]
        trajectory_x1.append(x1)
        y1 = model1.y.detach().numpy()[0]
        trajectory_y1.append(y1)

        x2 = model2.x.detach().numpy()[0]
        trajectory_x2.append(x2)
        y2 = model2.y.detach().numpy()[0]
        trajectory_y2.append(y2)

    # 軌跡を可視化する
    fig, ax = plt.subplots(1, 1, figsize=(8, 6))
    ax.plot(trajectory_x1, trajectory_y1, marker="o", markersize=5, label="Trajectory1")
    ax.plot(trajectory_x2, trajectory_y2, marker="o", markersize=5, label="Trajectory2")
    ax.legend()
    ax.grid(True)
    ax.set_xlabel("x")
    ax.set_ylabel("y")
    plt.show()


if __name__ == "__main__":
    main()

オプティマイザに登録している部分は以下のとおり。 リストの中に辞書形式で、複数のパラメータと学習率を登録している。

    # 複数のパラメータを最適化したい場合は、それぞれを辞書として渡す
    # 結果は torch.optim.Optimizer#param_groups の各要素として入る
    optimizer = CustomSGD(
        [
            {
                "params": model1.parameters(),
                "lr": 0.95,
            },
            {
                "params": model2.parameters(),
                "lr": 0.05,
            },
        ]
    )

上記のサンプルコードに名前をつけて実行してみる。

$ python groupsgd.py

すると、次のようなグラフが得られる。 異なる色の線が、それぞれのモデルのパラメータが更新されていく軌跡を表している。

自作した SGD で複数のモデルを最適化したパラメータの軌跡

上記から、それぞれのパラメータが 0 に向かって更新されていく様子が確認できる。

Momentum を導入した SGD を実装する

続いては SGD に Momentum の概念を導入する。 Momentum ではパラメータの更新にそれまでの勢いが加味されることから局所最適解に陥りにくくなる効果が見込める。

ここでは Momentum の実装を通して、オプティマイザで状態を表す変数の使い方を紹介したい。 というのも、先ほど実装した単純な SGD の更新式にはモデルのパラメータ以外に変数がなく、学習率も定数に過ぎなかった。 一方で Momentum では慣性を扱うことから、それまでのパラメータの更新のされ方を記録しておく必要がある。

早速だけど以下にサンプルコードを示す。 CustomMomentumSGD というクラスで Momentum を導入した SGD を実装している。 問題設定などは先ほどと変わらない。 ポイントは CustomMomentumSGDstep() メソッドの中で self.state というインスタンス変数を扱っているところ。 これを使うことで、オプティマイザに状態を持たせることができる。

from collections.abc import Iterable
from typing import Any

import torch
from matplotlib import pyplot as plt
from torch import nn
from torch.optim import Optimizer


class ExampleFunction(nn.Module):

    def __init__(self, a, x, b, y):
        super(ExampleFunction, self).__init__()
        self.a = a
        self.x = nn.Parameter(torch.tensor([x]))
        self.b = b
        self.y = nn.Parameter(torch.tensor([y]))

    def forward(self):
        return self.a * self.x**2 + self.b * self.y**2


class CustomMomentumSGD(Optimizer):
    """自作した Momentum SGD のオプティマイザ"""

    def __init__(self, params: Iterable, lr: float = 1e-3, momentum: float = 0.9):
        defaults: dict[str, Any] = dict(
            lr=lr,
            momentum=momentum,
        )
        super(CustomMomentumSGD, self).__init__(params, defaults)

    def step(self, closure: None = None) -> None:
        """Momentum を導入した SGD の更新式を実装した step() メソッド

        (更新式)
        v_0 = 0
        v_{t+1} = gamma * v_t + grad(L(theta_t))
        theta_{t+1} = theta_t - eta * v_{t+1}

        theta: パラメータ (重み)
        gamma: モーメンタム係数
        v: モーメント
        eta: 学習率
        grad(L(theta)): 損失関数の勾配
        """
        for group in self.param_groups:
            for param in group["params"]:
                if param.grad is None:
                    continue
                # v_0 = 0 に対応している
                if "v" not in self.state[param]:
                    # モーメントが存在しない初期状態であればゼロで初期化する
                    self.state[param]["v"] = torch.zeros_like(param.data)
                # v_{t+1} = gamma * v_t + grad(L(theta_t)) に対応している
                self.state[param]["v"] = (
                    group["momentum"] * self.state[param]["v"] + param.grad
                )
                # theta_{t+1} = theta_t - eta * v_{t+1} に対応している
                param.data -= group["lr"] * self.state[param]["v"]


def main():
    model = ExampleFunction(a=1 / 20, x=-7.0, b=1.0, y=2.0)

    # 自作した Momentum SGD で最適化する
    optimizer = CustomMomentumSGD(model.parameters(), lr=0.1, momentum=0.9)

    trajectory_x = [model.x.detach().numpy()[0]]
    trajectory_y = [model.y.detach().numpy()[0]]

    num_epochs = 30
    for epoch in range(1, num_epochs + 1):
        optimizer.zero_grad()

        outputs = model()

        outputs.backward()

        optimizer.step()

        x = model.x.detach().numpy()[0]
        trajectory_x.append(x)
        y = model.y.detach().numpy()[0]
        trajectory_y.append(y)

    fig, ax = plt.subplots(1, 1, figsize=(8, 6))
    ax.plot(trajectory_x, trajectory_y, marker="o", markersize=5, label="Trajectory")
    ax.legend()
    ax.grid(True)
    ax.set_xlabel("x")
    ax.set_ylabel("y")
    plt.show()


if __name__ == "__main__":
    main()

以降は単純な SGD との違いについて __init__() メソッドと step() のコードを見ていく。

__init__() メソッドについて

まず、__init__() では momentum という名前で float 型の引数が増えている。 これは、パラメータが更新される際の慣性の強さを指定するハイパーパラメータになっている。 この引数の値が大きいほど、それまでの勢いが強く反映された状態でパラメータが更新される。 学習率 (lr) と同じように、スーパークラスの __init__() に渡すことで group["momentum"] という形式でアクセスできるようになる。

step() メソッドについて

続いては step() メソッドについて。 このメソッドは Momentum を導入した SGD の更新式と共に見ていこう。 更新式は次のとおり。

 \displaystyle
v_0 = 0 \\
v_{t+1} = \gamma v_t + \nabla_{\theta} L(\theta_t) \\
\theta_{t+1} = \theta_t - \eta v_{t+1}

上記の数式と、プログラムの変数の対応を以下に示す。 SGD に比べると  \gamma v が増えている。 前述した通り self.state を使って「勢い」を状態として保持する。

  •  \theta
    • param.data
  •  \eta
    • group["lr"]
  •  \nabla_{\theta} L(\theta)
    • param.grad
  •  \gamma
    • group["momentum"]
  •  v
    • self.state[param]["v"]

はじめに以下の更新式に対応するコードから。

 \displaystyle
v_0 = 0

ここでは、要するに最初は状態が何もないので必要な変数をゼロで初期化している。

                if "v" not in self.state[param]:
                    # モーメントが存在しない初期状態であればゼロで初期化する
                    self.state[param]["v"] = torch.zeros_like(param.data)

次に、以下の更新式に対応するコード。

 \displaystyle
v_{t+1} = \gamma v_t + \nabla_{\theta} L(\theta_t)

ここでは過去の更新の勢いを加味しながら、新しい勾配を使って次の更新のされ方を決めている。 こういった、過去の値に係数をかけつつ新しい値を足していくやり方は指数移動平均と呼ばれる。 主要なオプティマイザのアルゴリズムでは、この指数移動平均の処理が頻出する。

                self.state[param]["v"] = group["momentum"] * self.state[param]["v"] + param.grad

最後に、以下の更新式に対応するコード。

 \displaystyle
\theta_{t+1} = \theta_t - \eta v_{t+1}

ここでは、先ほどのモーメントに学習率をかけたもので実際のパラメータを更新している。

                param.data -= group["lr"] * self.state[param]["v"]

サンプルコードを実行する

一通り説明できたので、サンプルコードに適当な名前をつけて実行する。

$ python custommomentum.py

すると、以下のようなグラフが得られる。

自作した Momentum SGD で最適化したパラメータの軌跡

単純な SGD とはアルゴリズムやハイパーパラメータが異なることから、パラメータの軌跡も異なることが確認できる。

PyTorch 組み込みの結果と比べる

念の為、PyTorch に組み込みで用意されている Momentum SGD と結果が揃うことを確認する。

サンプルコードは次のとおり。 オプティマイザを組み込みのものに差し替えた以外の違いはない。

from collections.abc import Iterable
from typing import Any

import torch
from matplotlib import pyplot as plt
from torch import nn
from torch import optim
from torch.optim import Optimizer


class ExampleFunction(nn.Module):

    def __init__(self, a, x, b, y):
        super(ExampleFunction, self).__init__()
        self.a = a
        self.x = nn.Parameter(torch.tensor([x]))
        self.b = b
        self.y = nn.Parameter(torch.tensor([y]))

    def forward(self):
        return self.a * self.x**2 + self.b * self.y**2


def main():
    model = ExampleFunction(a=1 / 20, x=-7.0, b=1.0, y=2.0)

    # PyTorch 組み込みの Momentum SGD で最適化する
    # SGD の引数に momentum を指定するだけ
    optimizer = optim.SGD(model.parameters(), lr=0.1, momentum=0.9)

    trajectory_x = [model.x.detach().numpy()[0]]
    trajectory_y = [model.y.detach().numpy()[0]]

    num_epochs = 30
    for epoch in range(1, num_epochs + 1):
        optimizer.zero_grad()

        outputs = model()

        outputs.backward()

        optimizer.step()

        x = model.x.detach().numpy()[0]
        trajectory_x.append(x)
        y = model.y.detach().numpy()[0]
        trajectory_y.append(y)

    fig, ax = plt.subplots(1, 1, figsize=(8, 6))
    ax.plot(trajectory_x, trajectory_y, marker="o", markersize=5, label="Trajectory")
    ax.legend()
    ax.grid(True)
    ax.set_xlabel("x")
    ax.set_ylabel("y")
    plt.show()


if __name__ == "__main__":
    main()

上記に適当な名前をつけて実行する。

$ python torchmomentum.py

PyTorch 組み込みの Momentum SGD で最適化したパラメータの軌跡

自作した Momentum SGD と軌跡が一致していることが確認できる。

まとめ

今回は PyTorch でオプティマイザを自作する方法について紹介した。

参考

SGD と Momentum SGD の更新式は、以下の論文に記載されている内容を参考にした。

arxiv.org


  1. https://www.oreilly.co.jp/books/9784873117584/
  2. ただし closure が登録されているときは損失を受け取って、それをただメソッドの返り値にしているだけ

Python: isort で同じパッケージのインポートを 1 行ずつに分割する

isort は、Python のコードフォーマッタのひとつ。 使うことで、インポート文を Python の標準的なコーディング規約である PEP8 に沿った形で整形できる。

pycqa.github.io

ところで、PEP8 では同じパッケージのオブジェクトをインポートする際には 1 行にまとめても構わないとしている。

peps.python.org

ただ、これは完全に好みの問題だけど自分は同じパッケージのインポートであってもオブジェクトごとに 1 行ずつ分割したい。 1 行に 1 つのオブジェクトしか書かなければ、その行で何をインポートしているかが明確になって読みやすいと感じるため。

そこで、今回は isort で 1 行に 1 つのオブジェクトをインポートする方法について書く。 結論から先に述べると、コマンドラインオプションであれば --sl または --force-single-line-imports を付ければ良い。 また、設定ファイルを使って指定することもできる。

使った環境は次のとおり。

$ sw_vers                    
ProductName:        macOS
ProductVersion:     14.6.1
BuildVersion:       23G93
$ python -V        
Python 3.11.9

もくじ

下準備

下準備として isort をインストールする。

$ pip install isort

デフォルトの動作について

サンプルコードを用意する。 urllib.parse パッケージから parse_qs()urlparse() 関数の 2 つをインポートしている。

$ cat << 'EOF' > example.py
from urllib.parse import parse_qs
from urllib.parse import urlparse
EOF

上記のファイルに isort を実行する。

$ isort example.py

すると、同じパッケージからのインポートなので 1 行にまとめられる。

$ cat example.py           
from urllib.parse import parse_qs, urlparse

コマンドラインオプションで指定する

続いては、今回の本題である同じパッケージからのインポートであってもオブジェクトごとに 1 行ずつ分割する。 これには、コマンドラインオプションであれば --sl または --force-single-line-imports を指定すれば良い。

$ isort --sl example.py

実行すると、以下のように 1 行ずつに分割した形でインポートされる。

$ cat example.py 
from urllib.parse import parse_qs
from urllib.parse import urlparse

設定ファイルで指定する

とはいえ、毎回コマンドラインオプションを指定するのも大変なので設定ファイルにしたい。

pyproject.toml ファイルを使う場合

まず、最近の Python のプロジェクトであれば pyproject.toml を用意するのが一般的なはず。 ここで指定する場合には、次のような設定を追加する。

$ cat << 'EOF' >> pyproject.toml
[tool.isort]
force_single_line = true
EOF

あとは、設定ファイルのある場所で isort コマンドを実行するだけ。

.isort.cfg ファイルを使う場合

pyproject.toml を使わない場合には、専用の設定ファイルとして .isort.cfg を用意すれば良い。 こちらを使う場合には、次のような設定を使う。

$ cat << 'EOF' > .isort.cfg
[settings]
force_single_line=True
EOF

いじょう。

Python: Polars の shrink_dtype で DataFrame の使用メモリを削減する

Kaggle などのデータ分析コンペで使われるテクニックのひとつに reduce_mem_usage() 関数がある。 これは、一般に pandas の DataFrame のメモリ使用量を削減するために用いられる。 具体的には、カラムに出現する値を調べて、それを表現する上で必要最低限な型にキャストする。 たとえば、64 ビット整数のカラムを 32 ビット整数にできれば、理屈の上では必要なメモリ使用量がおよそ半分になる。

ただし、この関数は pandas が組み込みで提供しているわけではない。 そのため、各々がスニペットを秘伝のタレのように持っているか、あるいは必要に応じてウェブ上から探して利用する場合が多いはず。

一方で、Polars にはこれに相当する機能が組み込みで用意されている。 具体的には polars.Expr#shrink_dtype() という Expr オブジェクトを返すメソッドがある。 あまり知られていないようなので、今回は紹介してみる。

docs.pola.rs

使った環境は次のとおり。

$ sw_vers
ProductName:        macOS
ProductVersion:     14.6
BuildVersion:       23G80
$ python -V                      
Python 3.11.9
$ pip list | grep polars
polars        1.4.0

もくじ

下準備

まずは Polars をインストールしておく。

$ pip install polars

適当なデータセットとして Diamonds データセットをダウンロードする。

$ wget https://raw.githubusercontent.com/mwaskom/seaborn-data/master/diamonds.csv

Python のインタプリタを起動する。

$ python

CSV ファイルを読み込んで DataFrame オブジェクトを作る。 そのままだとサイズが分かりにくいので 200 個ほど連結してサイズをかさ増しする。

>>> import polars as pl
>>> raw_df = pl.read_csv("diamonds.csv")
>>> df = pl.concat([raw_df for _ in range(200)])
>>> df
shape: (10_788_000, 10)
┌───────┬───────────┬───────┬─────────┬───┬───────┬──────┬──────┬──────┐
│ carat ┆ cut       ┆ color ┆ clarity ┆ … ┆ price ┆ x    ┆ y    ┆ z    │
│ ------------     ┆   ┆ ------------  │
│ f64   ┆ str       ┆ str   ┆ str     ┆   ┆ i64   ┆ f64  ┆ f64  ┆ f64  │
╞═══════╪═══════════╪═══════╪═════════╪═══╪═══════╪══════╪══════╪══════╡
│ 0.23  ┆ Ideal     ┆ E     ┆ SI2     ┆ … ┆ 3263.953.982.43 │
│ 0.21  ┆ Premium   ┆ E     ┆ SI1     ┆ … ┆ 3263.893.842.31 │
│ 0.23  ┆ Good      ┆ E     ┆ VS1     ┆ … ┆ 3274.054.072.31 │
│ 0.29  ┆ Premium   ┆ I     ┆ VS2     ┆ … ┆ 3344.24.232.63 │
│ 0.31  ┆ Good      ┆ J     ┆ SI2     ┆ … ┆ 3354.344.352.75 │
│ …     ┆ …         ┆ …     ┆ …       ┆ … ┆ …     ┆ …    ┆ …    ┆ …    │
│ 0.72  ┆ Ideal     ┆ D     ┆ SI1     ┆ … ┆ 27575.755.763.5  │
│ 0.72  ┆ Good      ┆ D     ┆ SI1     ┆ … ┆ 27575.695.753.61 │
│ 0.7   ┆ Very Good ┆ D     ┆ SI1     ┆ … ┆ 27575.665.683.56 │
│ 0.86  ┆ Premium   ┆ H     ┆ SI2     ┆ … ┆ 27576.156.123.74 │
│ 0.75  ┆ Ideal     ┆ D     ┆ SI2     ┆ … ┆ 27575.835.873.64 │
└───────┴───────────┴───────┴─────────┴───┴───────┴──────┴──────┴──────┘

特に指定しない場合、デフォルトでは数値のカラムが 64 ビットの型になる。

>>> df.dtypes
[Float64, String, String, String, Float64, Float64, Int64, Float64, Float64, Float64]

この状況では DataFrame のサイズは約 683MB だった。

>>> df.estimated_size(unit="mb")
683.1520080566406

使用メモリを削減する

それでは、実際に polars.Expr.shrink_dtype() を使って DataFrame の使用メモリを減らしてみよう。 すべてのカラムを処理の対象とする場合には pl.all().shrink_dtype() とすれば良い。 あとは得られた Expr オブジェクトを DataFrame#select() に渡せば DataFrame が変換される。

>>> shrinked_df = df.select(pl.all().shrink_dtype())
>>> shrinked_df
shape: (10_788_000, 10)
┌───────┬───────────┬───────┬─────────┬───┬───────┬──────┬──────┬──────┐
│ carat ┆ cut       ┆ color ┆ clarity ┆ … ┆ price ┆ x    ┆ y    ┆ z    │
│ ------------     ┆   ┆ ------------  │
│ f32   ┆ str       ┆ str   ┆ str     ┆   ┆ i16   ┆ f32  ┆ f32  ┆ f32  │
╞═══════╪═══════════╪═══════╪═════════╪═══╪═══════╪══════╪══════╪══════╡
│ 0.23  ┆ Ideal     ┆ E     ┆ SI2     ┆ … ┆ 3263.953.982.43 │
│ 0.21  ┆ Premium   ┆ E     ┆ SI1     ┆ … ┆ 3263.893.842.31 │
│ 0.23  ┆ Good      ┆ E     ┆ VS1     ┆ … ┆ 3274.054.072.31 │
│ 0.29  ┆ Premium   ┆ I     ┆ VS2     ┆ … ┆ 3344.24.232.63 │
│ 0.31  ┆ Good      ┆ J     ┆ SI2     ┆ … ┆ 3354.344.352.75 │
│ …     ┆ …         ┆ …     ┆ …       ┆ … ┆ …     ┆ …    ┆ …    ┆ …    │
│ 0.72  ┆ Ideal     ┆ D     ┆ SI1     ┆ … ┆ 27575.755.763.5  │
│ 0.72  ┆ Good      ┆ D     ┆ SI1     ┆ … ┆ 27575.695.753.61 │
│ 0.7   ┆ Very Good ┆ D     ┆ SI1     ┆ … ┆ 27575.665.683.56 │
│ 0.86  ┆ Premium   ┆ H     ┆ SI2     ┆ … ┆ 27576.156.123.74 │
│ 0.75  ┆ Ideal     ┆ D     ┆ SI2     ┆ … ┆ 27575.835.873.64 │
└───────┴───────────┴───────┴─────────┴───┴───────┴──────┴──────┴──────┘

上記を見ても分かるとおり、行数や列数などは何も変わっていない。 ただし、カラムの型は必要最低限なものにキャストされている。

>>> shrinked_df.dtypes
[Float32, String, String, String, Float32, Float32, Int16, Float32, Float32, Float32]

処理後の DataFrame は、使用メモリが約 374MB まで減っている。

>>> shrinked_df.estimated_size(unit="mb")
374.5048522949219

いじょう。

まとめ

  • Polars には reduce_mem_usage() 関数に相当する機能が組み込みで用意されている
  • polars.Expr.shrink_dtype() という Expr オブジェクトを返す関数を使う

Python: PyTorch で AutoEncoder を書いてみる

PyTorch に慣れるためにコードをたくさん読み書きしていきたい。 今回は MNIST データセットを使ってシンプルな AutoEncoder を書いてみる。

使った環境は次のとおり。

$ sw_vers             
ProductName:        macOS
ProductVersion:     14.5
BuildVersion:       23F79
$ python -V
Python 3.11.9
$ sysctl machdep.cpu.brand_string
machdep.cpu.brand_string: Apple M2 Pro
$ pip list | egrep "(torch|matplotlib)"
matplotlib        3.9.0
torch             2.3.1
torchvision       0.18.1

もくじ

下準備

下準備として必要なパッケージをインストールする。

$ pip install torch torchvision matplotlib

サンプルコード

早速だけど以下がサンプルコードになる。 説明は適宜、コメントの形で挿入している。

#!/usr/bin/env python3

import random

import torch
import torch.nn as nn
import torch.optim as optim
from matplotlib import pyplot as plt
from torch.utils.data import DataLoader, random_split
from torchvision import datasets, transforms


def set_random_seed(seed):
    """シード値を設定する"""
    random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)


def computing_device(force=None):
    """環境によって計算に使うデバイスを切り替える関数"""
    if force is not None:
        return force
    if torch.cuda.is_available():
        return "cuda"
    if torch.backends.mps.is_available():
        return "mps"
    return "cpu"


class AutoEncoder(nn.Module):
    """ボトルネック部分で 32 次元まで圧縮する 3 層 AutoEncoder モデル"""

    def __init__(self):
        super(AutoEncoder, self).__init__()
        self.encoder = nn.Sequential(
            nn.Linear(
                in_features=28 * 28,
                out_features=128,
            ),
            nn.ReLU(inplace=True),
            nn.Linear(
                in_features=128,
                out_features=64,
            ),
            nn.ReLU(inplace=True),
            nn.Linear(
                in_features=64,
                out_features=32,
            ),
        )
        self.decoder = nn.Sequential(
            nn.Linear(
                in_features=32,
                out_features=64,
            ),
            nn.ReLU(inplace=True),
            nn.Linear(
                in_features=64,
                out_features=128,
            ),
            nn.ReLU(inplace=True),
            nn.Linear(
                in_features=128,
                out_features=28 * 28,
            ),
            nn.Sigmoid(),
        )

    def encode(self, x):
        """エンコードの処理をするメソッド"""
        return self.encoder(x)

    def forward(self, x):
        """順伝播"""
        x = self.encode(x)
        x = self.decoder(x)
        return x


def evaluate(model, dataloader, device, criterion):
    """評価に使う関数"""
    model.eval()
    running_loss = 0.0
    with torch.no_grad():
        for data in dataloader:
            # ラベルは使用しない
            inputs, _ = data
            inputs = inputs.to(device)

            outputs = model(
                inputs,
            )

            loss = criterion(outputs, inputs)
            running_loss += loss.item()

    average_loss = running_loss / len(dataloader)

    return average_loss


def train(
    model,
    train_dataloader,
    valid_dataloader,
    device,
    criterion,
    optimizer,
    num_epochs,
    early_stopping_patience,
    checkpoint_path="checkpoint.pt",
):
    """学習に使う関数"""
    print(f"Device: {device}")

    # Early Stopping に使うカウンタ
    early_stopping_patience_counter = 0
    # Early Stopping に使う検証データに対する損失
    early_stopping_best_val_loss = float("inf")

    for epoch in range(1, num_epochs + 1):
        model.train()
        running_loss = 0.0
        for batch_idx, data in enumerate(train_dataloader):
            # ラベルは使用しない
            inputs, _ = data

            inputs = inputs.to(device)

            optimizer.zero_grad()
            outputs = model(
                inputs,
            )

            loss = criterion(outputs, inputs)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()

        print(f"Epoch [{epoch}/{num_epochs}], Training Loss: {running_loss / len(train_dataloader):.5f}")

        val_loss = evaluate(model, valid_dataloader, device, criterion)
        print(f"Epoch [{epoch}/{num_epochs}], Validation Loss: {val_loss:.5f}")

        if early_stopping_patience == -1:
            continue

        if val_loss < early_stopping_best_val_loss:
            early_stopping_best_val_loss = val_loss
            early_stopping_patience_counter = 0
            # ベストなモデルとして Checkpoint を更新する
            checkpoint_params = {
                "epoch": epoch,
                "model_state_dict": model.state_dict(),
                "optimizer_state_dict": optimizer.state_dict(),
                "loss": val_loss,
            }
            torch.save(
                checkpoint_params,
                checkpoint_path,
            )
        else:
            early_stopping_patience_counter += 1

        if early_stopping_patience_counter >= early_stopping_patience:
            print(f"Early stopping at epoch {epoch + 1}")
            break

    print("Training Finished")


def main():
    # 事前にシード値を固定する
    set_random_seed(42)

    # MNIST データセットを読み込む
    transform = transforms.Compose(
        [
            # PyTorch Tensor への変換と Min-Max Normalization
            transforms.ToTensor(),
            # (28, 28) -> (784,)
            transforms.Lambda(lambda x: torch.flatten(x)),
        ]
    )
    mnist_train_dataset = datasets.MNIST(
        root="dataset",
        train=True,
        download=True,
        transform=transform,
    )
    mnist_test_dataset = datasets.MNIST(
        root="dataset",
        train=False,
        download=True,
        transform=transform,
    )

    # 学習用のデータセットを学習用と検証用に分割する
    dataset_size = len(mnist_train_dataset)
    val_size = int(dataset_size * 0.2)
    train_size = dataset_size - val_size
    train_dataset, valid_dataset = random_split(
        mnist_train_dataset, (train_size, val_size)
    )

    # データローダを設定する
    batch_size = 64
    train_dataloader = DataLoader(
        mnist_train_dataset,
        batch_size=batch_size,
        shuffle=True,
    )
    valid_dataloader = DataLoader(
        valid_dataset,
        batch_size=batch_size,
        shuffle=False,
    )
    test_dataloader = DataLoader(
        mnist_test_dataset,
        batch_size=batch_size,
        shuffle=False,
    )

    # 最大エポック数
    num_epochs = 1_000
    # 改善が見られなかった場合に停止する Early Stopping のエポック数
    early_stopping_patience = 5

    # 学習に使うデバイス
    device = computing_device()

    # モデル
    model = AutoEncoder()
    model = model.to(device)

    # 損失関数
    criterion = nn.MSELoss()
    # オプティマイザ
    optimizer = optim.Adam(model.parameters())

    # 途中結果を記録するパス
    checkpoint_path = "MNIST-AE.pt"

    # 学習する
    train(
        model,
        train_dataloader,
        valid_dataloader,
        device,
        criterion,
        optimizer,
        num_epochs,
        early_stopping_patience,
        checkpoint_path,
    )

    # ベストなモデルをロードする
    checkpoint = torch.load(checkpoint_path)
    model.load_state_dict(checkpoint["model_state_dict"])
    best_epoch = checkpoint["epoch"]
    best_val_loss = checkpoint["loss"]

    # テストデータを評価する
    test_loss = evaluate(
        model,
        test_dataloader,
        device,
        criterion,
    )
    print(f"Epoch: {best_epoch}, Validation Loss: {best_val_loss:.5f}")
    print(f"Test Set Evaluation - Loss: {test_loss:.5f}")

    # テストデータに対する結果を可視化する
    model.eval()

    # 最初のミニバッチを取り出す
    mini_batch = next(iter(test_dataloader))

    # ミニバッチのデータをモデルに通す
    inputs, labels = mini_batch
    inputs = inputs.to(device)
    with torch.no_grad():
        outputs = model(
            inputs,
        ).to("cpu")
        encoded = model.encode(
            inputs,
        ).to("cpu")

    # ランダムに 10 点をサンプリングして可視化する
    sample_indices = random.sample(range(mini_batch[0].shape[0]), 10)
    fig, axes = plt.subplots(3, 10)
    for i, idx in enumerate(sample_indices):
        # 元の画像 (28 x 28)
        orig_img = mini_batch[0][idx].reshape(28, 28)
        axes[0][i].imshow(orig_img, cmap="gray")
        axes[0][i].axis("off")
        axes[0][i].set_title(labels[idx].numpy(), color="red")
        # ボトルネック部分での表現 (8 x 4)
        enc_img = encoded[idx].reshape(8, 4)
        axes[1][i].imshow(enc_img, cmap="gray")
        axes[1][i].axis("off")
        # 復元した画像 (28 x 28)
        pred_img = outputs[idx].reshape(28, 28)
        axes[2][i].imshow(pred_img, cmap="gray")
        axes[2][i].axis("off")

    plt.savefig("mnistae.png")
    plt.show()


if __name__ == "__main__":
    main()

上記を実行する。 エポックを重ねる毎に少しずつ損失が減っていく。

$ python mnistae.py
Device: mps
Epoch [1/1000], Training Loss: 0.04848
Epoch [1/1000], Validation Loss: 0.02846
Epoch [2/1000], Training Loss: 0.02480
Epoch [2/1000], Validation Loss: 0.02163
Epoch [3/1000], Training Loss: 0.01990
Epoch [3/1000], Validation Loss: 0.01821
...
Epoch [78/1000], Training Loss: 0.00544
Epoch [78/1000], Validation Loss: 0.00538
Epoch [79/1000], Training Loss: 0.00542
Epoch [79/1000], Validation Loss: 0.00542
Epoch [80/1000], Training Loss: 0.00541
Epoch [80/1000], Validation Loss: 0.00536
Early stopping at epoch 81
Training Finished
Epoch: 75, Validation Loss: 0.00532
Test Set Evaluation - Loss: 0.00542

実行が完了すると次のような可視化が得られる。

実行結果

それぞれ、以下のような意味がある。

  • 上段はモデルの入力となった画像を表している
  • 中段はモデルのボトルネック部分において圧縮された表現を可視化したもの
  • 下段はモデルが出力した画像を表している

Python: PyTorch の MLP で Iris データセットを分類してみる

ニューラルネットワークのことが何も分からないので少しずつでも慣れていきたい。 そのためには、とにかくたくさんのコードを読んで書くしかないと思う。 一環として、今回はこれ以上ないほど簡単なタスクを解くコードを書いてみる。 具体的には、シンプルな MLP (Multi Layer Perceptron) を使って Iris データセットの多値分類タスクを解く。 実用性はないけど PyTorch のやり方は一通り詰まっている感じがする。

使った環境は次のとおり。

$ sw_vers 
ProductName:        macOS
ProductVersion:     14.5
BuildVersion:       23F79
$ python -V        
Python 3.11.9
$ pip list | egrep "(torch|scikit-learn)"
scikit-learn      1.5.0
torch             2.3.0

もくじ

下準備

下準備として PyTorch と scikit-learn をインストールしておく。

$ pip install torch scikit-learn

サンプルコード

早速だけど以下にサンプルコードを示す。 説明は適宜コメントを入れている。 やっていることに対して行数が多い印象は受ける。 とはいえ、生の PyTorch を使う場合はこんな感じなんだろう。

#!/usr/bin/env python3

"""PyTorch を使って Iris データセットを分類する
モデルには 3 層 MLP を使用する
"""

import random

import torch
import torch.nn as nn
import torch.optim as optim
from sklearn import datasets
from sklearn.metrics import f1_score, precision_score, recall_score
from sklearn.model_selection import train_test_split
from torch.nn import functional as F
from torch.utils.data import DataLoader, Dataset


def set_seed(seed):
    """シード値を設定する"""
    random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)


class OnMemoryDataset(Dataset):
    """オンメモリ上の NumPy / Tensor 配列を使用するデータセット"""

    def __init__(self, x, y):
        self.x = torch.from_numpy(x) if not torch.is_tensor(x) else x
        self.y = torch.from_numpy(y) if not torch.is_tensor(y) else y
        self.x = self.x.to(torch.float32)

    def __len__(self):
        return len(self.y)

    def __getitem__(self, idx):
        return self.x[idx], self.y[idx]


class MLPClassifier(nn.Module):
    """3 層 MLP の分類器"""

    def __init__(self):
        super().__init__()
        self.fc1 = nn.Linear(4, 100)
        self.fc2 = nn.Linear(100, 50)
        self.fc3 = nn.Linear(50, 3)

    def forward(self, batch):
        out = self.fc1(batch)
        out = F.relu(out)
        out = self.fc2(out)
        out = F.relu(out)
        out = self.fc3(out)
        return out


def evaluate(model, dataloader, device, criterion):
    """評価・検証用データに対する損失や評価指標を求める関数"""
    # モデルを評価モードにする
    model.eval()
    all_labels = []
    all_preds = []
    running_loss = 0.0
    # 勾配は必要ない
    with torch.no_grad():
        # DataLoader からバッチを読み込む
        for inputs, labels in dataloader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            # 損失を求める
            loss = criterion(outputs, labels)
            running_loss += loss.item()
            _, preds = torch.max(outputs, 1)
            all_labels.extend(labels.cpu().numpy())
            all_preds.extend(preds.cpu().numpy())

    # 損失、Precision、Recall、F-1 スコアを求める
    average_loss = running_loss / len(dataloader)
    precision = precision_score(all_labels, all_preds, average="macro", zero_division=0)
    recall = recall_score(all_labels, all_preds, average="macro")
    f1 = f1_score(all_labels, all_preds, average="macro")

    return average_loss, precision, recall, f1


def acceleration_device(force=None):
    """環境によって計算に使うデバイスを切り替える関数"""
    if force is not None:
        return force
    if torch.cuda.is_available():
        return "cuda"
    if torch.backends.mps.is_available():
        return "mps"
    return "cpu"


def main():
    # 乱数シードを指定する
    set_seed(42)

    # Iris データセットを読み込む
    x, y = datasets.load_iris(return_X_y=True)

    # 学習データと評価データに分割する
    train_x, test_x, train_y, test_y = train_test_split(
        x,
        y,
        shuffle=True,
        random_state=42,
        test_size=0.2,
    )

    # 学習データと検証データに分割する
    train_x, valid_x, train_y, valid_y = train_test_split(
        train_x,
        train_y,
        shuffle=True,
        random_state=42,
        test_size=0.3,
    )

    # DataLoader で読み込むバッチサイズ
    batch_size = 64

    # 学習データ
    train_dataset = OnMemoryDataset(
        train_x,
        train_y,
    )
    train_dataloader = DataLoader(
        train_dataset,
        batch_size=batch_size,
        shuffle=True,
    )
    # 検証データ
    valid_dataset = OnMemoryDataset(
        valid_x,
        valid_y,
    )
    valid_dataloader = DataLoader(
        valid_dataset,
        batch_size=batch_size,
        shuffle=False,
    )
    # 評価データ
    test_dataset = OnMemoryDataset(
        test_x,
        test_y,
    )
    test_dataloader = DataLoader(
        test_dataset,
        batch_size=batch_size,
        shuffle=False,
    )

    # モデル
    model = MLPClassifier()
    # 損失関数
    criterion = nn.CrossEntropyLoss()
    # オプティマイザ
    optimizer = optim.Adam(model.parameters(), lr=1e-3)
    # 計算に用いるデバイス
    device = acceleration_device()
    model = model.to(device)
    print(f"Device: {device}")

    # 最大エポック数
    num_epochs = 1000
    # ログを出力するバッチ間隔
    log_interval = -1
    # 改善が見られなかった場合に停止する Early Stopping のエポック数
    early_stopping_patience = 10
    # Early Stopping に使うカウンタ
    early_stopping_patience_counter = 0
    # Early Stopping に使う検証データに対する損失
    early_stopping_best_val_loss = float("inf")

    # エポックを回す
    for epoch in range(num_epochs):
        # モデルを学習モードにする
        model.train()
        # エポック単位で計算する損失
        running_loss = 0.0
        # DataLoader からバッチを読み込む
        for batch_idx, (inputs, labels) in enumerate(train_dataloader):
            # デバイスに転送する
            inputs, labels = inputs.to(device), labels.to(device)
            # 勾配を初期化する
            optimizer.zero_grad()
            # 出力を得る
            outputs = model(inputs)
            # 損失を求める
            loss = criterion(outputs, labels)
            # 勾配を求める
            loss.backward()
            # 最適化する
            optimizer.step()
            # バッチの損失を足す
            running_loss += loss.item()

            if log_interval == -1:
                continue

            # 指定されたバッチ数の間隔でログを出力する
            if (batch_idx + 1) % log_interval == 0:
                print(
                    f"Epoch [{epoch + 1}/{num_epochs}], Batch [{batch_idx + 1}/{len(train_dataloader)}], Training Loss: {loss.item():.4f}"
                )

        # エポックでの学習損失を求める
        print(
            f"Epoch [{epoch + 1}/{num_epochs}], Training Loss: {running_loss / len(train_dataloader):.4f}"
        )

        # 検証データに対する損失や評価指標をログに出力する
        val_loss, val_precision, val_recall, val_f1 = evaluate(
            model, valid_dataloader, device, criterion
        )
        print(
            f"Epoch [{epoch + 1}/{num_epochs}], Validation Loss: {val_loss:.4f}, Precision: {val_precision:.4f}, Recall: {val_recall:.4f}, F1 Score: {val_f1:.4f}"
        )

        # 検証データの損失を元に Early Stopping の処理をする
        if val_loss < early_stopping_best_val_loss:
            early_stopping_best_val_loss = val_loss
            early_stopping_patience_counter = 0
        else:
            early_stopping_patience_counter += 1

        # 条件を満たしたら学習のループを抜ける
        if early_stopping_patience_counter >= early_stopping_patience:
            print(f"Early stopping at epoch {epoch + 1}")
            break

    # 学習が完了した
    print("Training Finished")

    # 評価データに対する損失や評価指標をログに出力する
    test_loss, test_precision, test_recall, test_f1 = evaluate(
        model, test_dataloader, device, criterion
    )
    print(
        f"Test Set Evaluation - Loss: {test_loss:.4f}, Precision: {test_precision:.4f}, Recall: {test_recall:.4f}, F1 Score: {test_f1:.4f}"
    )


if __name__ == "__main__":
    main()

上記を実行してみよう。

$ python irismlp.py
Device: mps
Epoch [1/1000], Training Loss: 1.0881
Epoch [1/1000], Validation Loss: 1.0115, Precision: 0.1389, Recall: 0.3333, F1 Score: 0.1961
Epoch [2/1000], Training Loss: 1.0227
Epoch [2/1000], Validation Loss: 0.9958, Precision: 0.4691, Recall: 0.3667, F1 Score: 0.2536
Epoch [3/1000], Training Loss: 0.9711
Epoch [3/1000], Validation Loss: 0.9820, Precision: 0.4744, Recall: 0.6667, F1 Score: 0.5315
...(省略)...
Epoch [95/1000], Training Loss: 0.0695
Epoch [95/1000], Validation Loss: 0.1670, Precision: 0.9286, Recall: 0.9333, F1 Score: 0.9230
Epoch [96/1000], Training Loss: 0.0726
Epoch [96/1000], Validation Loss: 0.1752, Precision: 0.9286, Recall: 0.9333, F1 Score: 0.9230
Epoch [97/1000], Training Loss: 0.0694
Epoch [97/1000], Validation Loss: 0.1925, Precision: 0.9286, Recall: 0.9333, F1 Score: 0.9230
Early stopping at epoch 97
Training Finished
Test Set Evaluation - Loss: 0.0939, Precision: 1.0000, Recall: 1.0000, F1 Score: 1.0000

最終的に評価用のデータに対して高い評価指標が得られている。

いじょう。

Python: scikit-learn は v1.4 から Polars をサポートした

scikit-learn に組み込みで用意されている Transformer は、長らく入出力として NumPy 配列にしか対応していなかった。 その状況が変わったのは v1.2 以降で、Pandas の DataFrame を扱えるようになった。

blog.amedama.jp

そして v1.4 からは、ついに Polars の DataFrame もサポートされた。 今回は、実際にその機能を試してみよう。

scikit-learn.org

使った環境は次のとおり。

$ sw_vers    
ProductName:        macOS
ProductVersion:     14.5
BuildVersion:       23F79
$ python -V                      
Python 3.11.9
$ pip list | egrep "(scikit-learn|polars)"
polars        0.20.27
scikit-learn  1.5.0

もくじ

下準備

まずは scikit-learn と Polars をインストールする。

$ pip install scikit-learn polars

インストールできたら、次に Python のインタプリタを起動する。

$ python

続いて、動作確認用のデータを用意する。 今回は Iris データセットにした。 ひとまず NumPy 配列として読み込む。

>>> from sklearn.datasets import load_iris
>>> X, _ = load_iris(as_frame=False, return_X_y=True)
>>> X[:5]
array([[5.1, 3.5, 1.4, 0.2],
       [4.9, 3. , 1.4, 0.2],
       [4.7, 3.2, 1.3, 0.2],
       [4.6, 3.1, 1.5, 0.2],
       [5. , 3.6, 1.4, 0.2]])

続いて、適当な Transformer を用意する。 今回は StandardScaler にした。

>>> from sklearn.preprocessing import StandardScaler
>>> scaler = StandardScaler()

ひとまず、用意した StandardScaler のインスタンスを使ってデータを変換する。 すると、デフォルトでは NumPy 配列として結果が返ってくる。

>>> X_scaled = scaler.fit_transform(X)
>>> X_scaled[:5]
array([[-0.90068117,  1.01900435, -1.34022653, -1.3154443 ],
       [-1.14301691, -0.13197948, -1.34022653, -1.3154443 ],
       [-1.38535265,  0.32841405, -1.39706395, -1.3154443 ],
       [-1.50652052,  0.09821729, -1.2833891 , -1.3154443 ],
       [-1.02184904,  1.24920112, -1.34022653, -1.3154443 ]])

この振る舞いは従来から変わっていない。

出力を Polars の DataFrame にする

それでは、次に出力を Polars の DataFrame にしてみよう。

具体的には、先ほど用意した StandardScaler のインスタンスの set_output() メソッドを呼び出す。 このとき、引数の transform"polars" を指定する。

>>> scaler.set_output(transform="polars")

この状況で、もう一度データを変換してみよう。

>>> X_scaled = scaler.fit_transform(X)

すると、返ってくるのが Polars の DataFrame になる。

>>> type(X_scaled)
<class 'polars.dataframe.frame.DataFrame'>
>>> X_scaled
shape: (150, 4)
┌───────────┬───────────┬───────────┬───────────┐
│ x0        ┆ x1        ┆ x2        ┆ x3        │
│ ---       ┆ ---       ┆ ---       ┆ ---       │
│ f64       ┆ f64       ┆ f64       ┆ f64       │
╞═══════════╪═══════════╪═══════════╪═══════════╡
│ -0.9006811.019004  ┆ -1.340227 ┆ -1.315444 │
│ -1.143017 ┆ -0.131979 ┆ -1.340227 ┆ -1.315444 │
│ -1.3853530.328414  ┆ -1.397064 ┆ -1.315444 │
│ -1.5065210.098217  ┆ -1.283389 ┆ -1.315444 │
│ -1.0218491.249201  ┆ -1.340227 ┆ -1.315444 │
│ …         ┆ …         ┆ …         ┆ …         │
│ 1.038005  ┆ -0.1319790.8195961.448832  │
│ 0.553333  ┆ -1.2829630.7059210.922303  │
│ 0.795669  ┆ -0.1319790.8195961.053935  │
│ 0.4321650.7888080.9332711.448832  │
│ 0.068662  ┆ -0.1319790.7627580.790671  │
└───────────┴───────────┴───────────┴───────────┘

なお、入力が NumPy 配列なので、カラム名は x0, x1, x2, ... という命名規則で自動的に与えられている。

入力として Polars の DataFrame を与えてみる

次は入力として Polars の DataFrame を与えてみるパターンも試しておこう。

NumPy 配列を元にして Polars の DataFrame を作成する。

>>> import polars as pl
>>> col_names = {
...     "sepal_length",
...     "sepal_width",
...     "petal_length",
...     "petal_width",
... }
>>> df = pl.DataFrame(data=X, schema=col_names)

作った DataFrame を StandardScaler のインスタンスに渡す。 すると、結果が先ほどと同じように Polars の DataFrame として得られる。

>>> scaler.fit_transform(df)
shape: (150, 4)
┌──────────────┬─────────────┬──────────────┬─────────────┐
│ sepal_length ┆ petal_width ┆ petal_length ┆ sepal_width │
│ ---          ┆ ---         ┆ ---          ┆ ---         │
│ f64          ┆ f64         ┆ f64          ┆ f64         │
╞══════════════╪═════════════╪══════════════╪═════════════╡
│ -0.9006811.019004    ┆ -1.340227    ┆ -1.315444   │
│ -1.143017    ┆ -0.131979   ┆ -1.340227    ┆ -1.315444   │
│ -1.3853530.328414    ┆ -1.397064    ┆ -1.315444   │
│ -1.5065210.098217    ┆ -1.283389    ┆ -1.315444   │
│ -1.0218491.249201    ┆ -1.340227    ┆ -1.315444   │
│ …            ┆ …           ┆ …            ┆ …           │
│ 1.038005     ┆ -0.1319790.8195961.448832    │
│ 0.553333     ┆ -1.2829630.7059210.922303    │
│ 0.795669     ┆ -0.1319790.8195961.053935    │
│ 0.4321650.7888080.9332711.448832    │
│ 0.068662     ┆ -0.1319790.7627580.790671    │
└──────────────┴─────────────┴──────────────┴─────────────┘

ただし、今回は元の DataFrame にカラム名があるので、それがそのまま使われている。

常に出力を Polars の DataFrame にしたい

ここまでの例は、個別のインスタンスにおいて出力の形式を指定するやり方だった。 一方で、実際に使う場合にはデフォルトの形式を変更したいことは多いはず。

そんなときは sklearn.set_config() 関数が使える。 引数の transform_output"polars" を指定しておけばデフォルトの形式が Polars の DataFrame になる。

>>> from sklearn import set_config
>>> set_config(transform_output="polars")

上記を実行した後で、あらためて StandardScaler のインスタンスを作成しよう。

>>> scaler = StandardScaler()

そしてデータを変換させてみると、今度は最初から Polars の DataFrame が返ってくる。

>>> scaler.fit_transform(X)
shape: (150, 4)
┌───────────┬───────────┬───────────┬───────────┐
│ x0        ┆ x1        ┆ x2        ┆ x3        │
│ ---       ┆ ---       ┆ ---       ┆ ---       │
│ f64       ┆ f64       ┆ f64       ┆ f64       │
╞═══════════╪═══════════╪═══════════╪═══════════╡
│ -0.9006811.019004  ┆ -1.340227 ┆ -1.315444 │
│ -1.143017 ┆ -0.131979 ┆ -1.340227 ┆ -1.315444 │
│ -1.3853530.328414  ┆ -1.397064 ┆ -1.315444 │
│ -1.5065210.098217  ┆ -1.283389 ┆ -1.315444 │
│ -1.0218491.249201  ┆ -1.340227 ┆ -1.315444 │
│ …         ┆ …         ┆ …         ┆ …         │
│ 1.038005  ┆ -0.1319790.8195961.448832  │
│ 0.553333  ┆ -1.2829630.7059210.922303  │
│ 0.795669  ┆ -0.1319790.8195961.053935  │
│ 0.4321650.7888080.9332711.448832  │
│ 0.068662  ┆ -0.1319790.7627580.790671  │
└───────────┴───────────┴───────────┴───────────┘

いいかんじ。

まとめ

scikit-learn は v1.4 以降で Polars と連携させやすくなった。

Ubuntu 24.04 LTS に後から GUI (X Window System) を追加する

Ubuntu をサーバ版でインストールした場合、デスクトップ環境などはデフォルトで入らない。 しかし、後から必要になる場合もある。 そこで、今回は Ubuntu 24.04 LTS に、後から GUI を追加する方法を書く。

なお、この確認は LTS 版のリリースが出る度に実施している。 一部を除いて、やり方は Ubuntu 22.04 LTS と変わらなかった。

使った環境は次のとおり。

$ lsb_release -a
No LSB modules are available.
Distributor ID: Ubuntu
Description:    Ubuntu 24.04 LTS
Release:    24.04
Codename:   noble
$ uname -srm
Linux 6.8.0-31-generic aarch64

もくじ

下準備

あらかじめ APT のパッケージインデックスを最新の状態にしておく。

$ sudo apt-get update

デスクトップ環境をインストールする

デスクトップ環境が必要な場合は ubuntu-desktop パッケージをインストールする。

$ sudo apt-get -y install ubuntu-desktop

依存しているパッケージが多いのでインストールの完了までに時間がかかる。

インストールが終わったらマシンを再起動する。

$ sudo shutdown -r now

すると、自動的にデスクトップ環境が有効な状態で起動する。

Ubuntu 24.04 LTS のデスクトップ環境

X Window System をインストールする

デスクトップ環境は不要で X Window System だけが必要な場合もある。 その際は xserver-xorgxauth をインストールする。

$ sudo apt-get -y install xserver-xorg xauth

必要に応じて SSH クライアントに X11 Forwarding の設定を入れたり、システムにログインし直す。

動作確認のために、追加で x11-apps をインストールする。

$ sudo apt-get -y install x11-apps

xeyes(1) を実行する。

$ xeyes

上手くいけば次のようにアプリケーションが起動するはず。

xeyes(1)

いじょう。