CUBE SUGAR CONTAINER

技術系のこと書きます。

Python: PyTorch で RMSProp を実装してみる

今回は、以下の記事の続きとして PyTorch で RMSProp のオプティマイザを実装してみる。

blog.amedama.jp

上記では PyTorch で Adagrad のオプティマイザを実装した。 Adagrad は学習率の調整に過去の勾配の平方和の累積を使っている。 このやり方には、イテレーションが進むと徐々に学習が進みにくくなってしまう問題がある。 そこで、RMSProp では学習率の調整に過去の勾配の平方和の指数移動平均を使っている。 これによって、徐々に学習が進みにくくなる問題を解決した。

使った環境は次のとおり。

$ sw_vers     
ProductName:        macOS
ProductVersion:     15.1
BuildVersion:       24B83
$ python -V                               
Python 3.12.7
$ pip list | egrep -i "(torch|matplotlib)"
matplotlib        3.9.2
torch             2.5.0

もくじ

下準備

あらかじめ PyTorch と Matplotlib をインストールしておく。

$ pip install torch matplotlib 

PyTorch 組み込みの RMSProp を試す

まずは PyTorch に組み込みで用意されている RMSProp の振る舞いを確認しておこう。

以下にサンプルコードを示す。 扱う問題は先に示した記事と同じもの。 この問題設定や初期値などは「ゼロから作るDeep Learning 1」に記載されている内容を参考にしている。 このサンプルコードでは、関数の出力をゼロに近づけるようにパラメータを更新する。

import torch
from matplotlib import pyplot as plt
from torch import nn
from torch import optim


class ExampleFunction(nn.Module):

    def __init__(self, a, x, b, y):
        super(ExampleFunction, self).__init__()
        self.a = a
        self.x = nn.Parameter(torch.tensor([x]))
        self.b = b
        self.y = nn.Parameter(torch.tensor([y]))

    def forward(self):
        return self.a * self.x**2 + self.b * self.y**2


def main():
    model = ExampleFunction(a=1 / 20, x=-7.0, b=1.0, y=2.0)

    optimizer = optim.RMSprop(model.parameters(), lr=0.1)

    trajectory_x = [model.x.detach().numpy()[0]]
    trajectory_y = [model.y.detach().numpy()[0]]

    num_epochs = 30
    for epoch in range(1, num_epochs + 1):
        optimizer.zero_grad()

        outputs = model()

        outputs.backward()

        optimizer.step()

        x = model.x.detach().numpy()[0]
        trajectory_x.append(x)
        y = model.y.detach().numpy()[0]
        trajectory_y.append(y)

    fig, ax = plt.subplots(1, 1, figsize=(8, 6))
    ax.plot(trajectory_x, trajectory_y, marker="o", markersize=5, label="Trajectory")
    ax.legend()
    ax.grid(True)
    ax.set_xlabel("x")
    ax.set_ylabel("y")
    plt.show()


if __name__ == "__main__":
    main()

上記に名前をつけて実行する。

$ python torchrmsprop.py

すると、次のようなグラフが得られる。 グラフは、パラメータが更新されていく様子を表している。

RMSProp で最適化したパラメータの軌跡

RMSProp のアルゴリズムを実装する

次に RMSProp のオプティマイザを自作する。 サンプルコードを以下に示す。 サンプルコードでは CustomRMSProp という名前でオプティマイザを実装した。

from collections.abc import Iterable
from typing import Any

import torch
from matplotlib import pyplot as plt
from torch import nn
from torch.optim import Optimizer


class ExampleFunction(nn.Module):

    def __init__(self, a, x, b, y):
        super(ExampleFunction, self).__init__()
        self.a = a
        self.x = nn.Parameter(torch.tensor([x]))
        self.b = b
        self.y = nn.Parameter(torch.tensor([y]))

    def forward(self):
        return self.a * self.x**2 + self.b * self.y**2


class CustomRMSProp(Optimizer):
    """自作した RMSProp のオプティマイザ"""

    def __init__(
        self,
        params: Iterable,
        lr: float = 1e-3,
        alpha=0.99,
        eps=1e-10,
        momentum=0.0,
    ):
        defaults: dict[str, Any] = dict(
            lr=lr,
            eps=eps,
            alpha=alpha,
            momentum=momentum,
        )
        super(CustomRMSProp, self).__init__(params, defaults)

    def step(self, closure=None):
        """RMSProp の更新式を実装した step() メソッド

        (更新式)
        v_0 = 0  ※ 論文では 1 で初期化している
        m_0 = 0
        v_{t+1} = rho * v_t + (1 - rho) * grad(L(theta_t))^2
        m_{t+1} = gamma * m_t + eta / (sqrt(v_{t+1}) + eps) * grad(L(theta_t))
        theta_{t+1} = theta_t - m_{t+1}

        theta: パラメータ (重み)
        eta: 学習率
        grad(L(theta)): 損失関数の勾配
        v: 過去の勾配の2乗の指数移動平均
        m: 過去の勾配を加味したモーメント
        rho: v を計算するときの指数移動平均の係数
        gamma: m を計算するときの指数移動平均の係数
        eps: ゼロ除算を防ぐための小さな値
        """
        for group in self.param_groups:
            for param in group["params"]:
                if param.grad is None:
                    continue
                # v_0 = 0 に対応する
                if "v" not in self.state[param]:
                    self.state[param]["v"] = torch.zeros_like(param.data)
                # m_0 = 0 に対応する
                if "m" not in self.state[param]:
                    self.state[param]["m"] = torch.zeros_like(param.data)
                # v_{t+1} = rho * v_t + (1 - rho) * grad(L(theta_t))^2 に対応する
                self.state[param]["v"] = group["alpha"] * self.state[param]["v"] + (1 - group["alpha"]) * param.grad ** 2
                # m_{t+1} = gamma * m_t + eta / (sqrt(v_{t+1}) + eps) * grad(L(theta_t)) に対応する
                self.state[param]["m"] = group["momentum"] * self.state[param]["m"] + group["lr"] / (torch.sqrt(self.state[param]["v"]) + group["eps"]) * param.grad
                # theta_{t+1} = theta_t - m_{t+1} に対応する
                param.data -= self.state[param]["m"]


def main():
    model = ExampleFunction(a=1 / 20, x=-7.0, b=1.0, y=2.0)

    optimizer = CustomRMSProp(model.parameters(), lr=0.1)

    trajectory_x = [model.x.detach().numpy()[0]]
    trajectory_y = [model.y.detach().numpy()[0]]

    num_epochs = 30
    for epoch in range(1, num_epochs + 1):
        optimizer.zero_grad()

        outputs = model()

        outputs.backward()

        optimizer.step()

        x = model.x.detach().numpy()[0]
        trajectory_x.append(x)
        y = model.y.detach().numpy()[0]
        trajectory_y.append(y)

    fig, ax = plt.subplots(1, 1, figsize=(8, 6))
    ax.plot(trajectory_x, trajectory_y, marker="o", markersize=5, label="Trajectory")
    ax.legend()
    ax.grid(True)
    ax.set_xlabel("x")
    ax.set_ylabel("y")
    plt.show()


if __name__ == "__main__":
    main()

上記に適当な名前をつけて実行する。

$ python customrmsprop.py

すると、次のようなグラフが得られる。

自作した RMSProp で最適化したパラメータの軌跡

先ほど確認した PyTorch 組み込みの RMSProp の結果と一致していることが分かる。

更新式とコードの対応関係について

ここからは更新式とコードについて見ていく。 RMSProp の更新式は以下のようになっている。

 \displaystyle
v_0 = 0 \\
m_0 = 0 \\
v_{t+1} = \rho v_t + (1 - \rho) \nabla_{\theta} L(\theta_t)^2 \\
m_{t+1} = \gamma m_t + \eta \frac{1}{\sqrt{v_{t+1} + \epsilon}} \nabla_{\theta} L(\theta_t) \\
\theta_{t+1} = \theta_t - m_{t+1}

数式と、プログラムの変数の対応関係は次のとおり。

  •  \theta
    • param.data
  •  \eta
    • group["lr"]
  •  \nabla_{\theta} L(\theta)
    • param.grad
  •  \rho
    • group["alpha"]
  •  \gamma
    • group["momentum"]
  •  v
    • self.state[param]["v"]
  •  m
    • self.state[param]["m"]
  •  \epsilon
    • group["eps"]

式から、 v には過去の勾配の平方和の指数移動平均が入ることが分かる。 過去の勾配がどれくらい値に影響するかは  \rho の係数を使って制御する。 この係数が大きいほど過去の値が重視され、小さいほど直近の値が重視される。 また、 m の計算式から分かるようにパラメータの更新量を決めるためにモーメントが用いられている。 ここに関しても  \gamma を係数にした指数移動平均になっている。

コードとの対応関係を見ていこう。 まずは、以下の更新式に対応するコードから。

 \displaystyle
v_0 = 0 \\
m_0 = 0

ここでは  v m がまだない状態、つまり初期状態のとき変数をゼロで初期化している。

                # v_0 = 0 に対応する
                if "v" not in self.state[param]:
                    self.state[param]["v"] = torch.zeros_like(param.data)
                # m_0 = 0 に対応する
                if "m" not in self.state[param]:
                    self.state[param]["m"] = torch.zeros_like(param.data)

続いては以下の更新式に対応するコード。

 \displaystyle
v_{t+1} = \rho v_t + (1 - \rho) \nabla_{\theta} L(\theta_t)^2

ここでは勾配の平方和について指数移動平均を求めている。

                # v_{t+1} = rho * v_t + (1 - rho) * grad(L(theta_t))^2 に対応する
                self.state[param]["v"] = group["alpha"] * self.state[param]["v"] + (1 - group["alpha"]) * param.grad ** 2

続いて以下の更新式に対応するコード。

 \displaystyle
m_{t+1} = \gamma m_t + \eta \frac{1}{\sqrt{v_{t+1} + \epsilon}} \nabla_{\theta} L(\theta_t)

ここではパラメータの更新量について指数移動平均を求めている。

                # m_{t+1} = gamma * m_t + eta / (sqrt(v_{t+1}) + eps) * grad(L(theta_t)) に対応する
                self.state[param]["m"] = group["momentum"] * self.state[param]["m"] + group["lr"] / (torch.sqrt(self.state[param]["v"]) + group["eps"]) * param.grad

そして、最後の以下の更新式に対応するコード。

 \displaystyle
\theta_{t+1} = \theta_t - m_{t+1}

ここでは実際にパラメータを更新している。

                # theta_{t+1} = theta_t - m_{t+1} に対応する
                param.data -= self.state[param]["m"]

いじょう。

参考

arxiv.org

車に興味の無かった人がファミリーカーを検討する上で知っておくと良さそうなこと

住んでいる地域によっては車を持たなくても不便なく生活できる。 それでも、子どもが生まれるといったライフイベントを経て、車の購入を検討する場面が訪れることはある。 そんなとき、これまで車に興味が無かった場合は、ほとんど知識がゼロの状態から調べることが想定される。 この作業には時間がかかるし、なかなか参考になるまとまった情報も見つかりにくい。 かといって、調査が不足した状況で購入に踏み切ると、後から後悔を招く可能性も高まる。 そこで、この記事では自分が調べたり経験した内容をベースに、検討する上で知っておくと良さそうなことを書き連ねてみる。

この記事の想定読者は次のとおり。

  • 車に興味を持ったことがない
  • 車の購入を検討している (あるいは今後する予定がある)
  • 車を購入する場合には子どもを含む家族で利用する

ここでは車を「時間を選ばずに使える移動手段」や「移動できるプライベート空間」と割り切って捉える。 現時点で車について一定の知識や何らかのこだわりがある人は、それに基づいて検討するだけなので読む必要はない。

また、万人に車の所有をすすめる意図もない。 所有せずに借りて利用する方法として、次のような選択肢も考えられる。

  • レンタカー
  • カーシェアリング
  • カーリース

上記でレンタカーやカーシェアリングに関しては導入までのコストが低い。 そのため、利用できるならまずは実際に試してみるのが手っ取り早い。 なお、カーリースについては一定の契約期間や利用する上での条件がある。 また、一般的には中途解約で違約金が生じることから、導入する前にはしっかりと検討した方が良い。 車を借りる場合にも、どんな車種を借りるか選定するタイミングであればこの記事が参考になるかもしれない。

あらかじめ断っておくと、この記事は長い。

もくじ

車種を選ぶ上での基準について

車に興味のない人が最初に悩むことのひとつが、一体どんな車があるのか分からないところだと思う。 そして、調べてもそれぞれにどんな違いがあるのか、それが自分たちに合った車なのかが最初は判断しにくい。

そこで、まずは子どものいる家族が車を選定する上で考慮した方が良さそうなポイントをいくつか挙げていく。 実際のところ、いくつかの視点で絞り込むと車種の選択肢は自然に限られてくる。 この点は、すでに車を利用している別の家族を観察すると、似たような車種が多いことからも実感できる。

話の前提として、すべての面で完璧な車というのは存在しえない。 これは、ある点を重視すると別の点で妥協が必要になるようなトレードオフが生じやすいため。 極端な例を挙げると、スポーツカーは走行性能を上げるために乗員数や室内空間などを重視しない。 つまり、車種の選択は、どういった点を重視してどういった点を妥協するかに依存しやすい。

パワースライドドアの有無

いきなりだけど取り上げたいのがパワースライドドアの有無について。 これには、ショッピングセンターなどの狭い駐車場に車を止めて、子どもを乗り降りさせる場面をイメージしてほしい。

まず、子どもが小さい頃は抱っこしながら車のドアを開閉しなければいけない場面が多い。 このとき、自分たちの車の左右に車が止まっていると、ヒンジドア (一般的な開き戸のタイプ) は片手で開閉するのに苦労する。 さらに、ドアを大きく開けられない場合には、子どもをチャイルドシートへ乗せたり降ろしたりすることに困難を伴うことがある。 また、ヒンジドアは子どもが成長して自分でドアを開けるようになると、隣に駐車している車を傷つけてしまう (ドアパンチという) 可能性が高まる。

パワースライドドアの場合は、ドアノブを引くだけでドアが開く。 また、ドアが丸ごと後ろにスライドして目前からいなくなるので、スペースが狭くても乗せたり降ろしたりしやすい。 ドアパンチのリスクも小さい。

そして、パワースライドドアは子どものいる家族を主要なユーザに想定している車種に装備されやすい。 そのため、ひとまずパワースライドドアのついた車種から選ぶという作戦も取れる。 パワースライドドアのついた車種はそこまで多くないので、これだけでも候補となる車種は減る。

一方で、パワースライドドアの主要なデメリットも挙げておく。 まず、重量がかさむことから燃費に若干の悪影響がある。 また、機構が複雑なので故障したときの修理代は高くなる傾向にある。

車のサイズ

続いては車のサイズについて。 分かりやすいところでは、運転する上で大きなサイズの車は扱いにくいと感じる恐れがある。 この点は、自宅周辺など普段利用する道の広さにも依存する。 また、駐車場によっては駐車できる車のサイズに制約が設けられていることがある。

特に、都市部に多い機械式の立体駐車場には注意を要する。 何故なら、機械式の立体駐車場は高さの制約がきついことがあるため。 たとえば、利用する駐車場に高さ 1550mm 以下という制約があると、先に述べたパワースライドドアを備える車種はほとんど選択肢がなくなってしまう。 また、仮にパワースライドドアをあきらめたとしてもデメリットは残る。 これは、子どものお世話がしやすい室内寸法の大きな車種は必然的に背が高くなる傾向にあるため。

また、平置きの駐車場であっても車のサイズに制約があったり、狭いスペースは軽自動車専用になっていることがある。 そのため、利用する予定の駐車場に止められる車の寸法は少なくとも事前に必ず確認してほしい。

ちなみに、一般的な乗用車には次の 3 種類の規格がある。 それぞれに乗員数や寸法、エンジンの排気量などが定められている。 基本的に上にいくほど小さく、そして諸々のコストが安い傾向にある。

  • 軽自動車
  • 小型自動車
  • 普通自動車

小型自動車と普通自動車は総称して登録(自動)車と呼ぶことがある。 これは、軽自動車とその以外では法律上の扱いが異なっており、登録車は陸運支局への登録が必要になるため。

では単純に車は小さい方が良いのかというと、もちろんそんなことはない。 一度に乗せられる荷物の量が減ったり、あるいは乗れる人数が少なくなるといったデメリットがある。

一度に乗れる人数

サイズと相関しやすいものの、一度に乗れる人数も車選びでは重要なポイントになる。 たとえば、軽自動車であれば規格で乗員は 4 名以下と定められている。 そのため、もし一度に 5 名以上を乗せたいときはそもそも軽自動車は選択肢から外れる。

ちなみに、12 歳未満の子どもは 1.5 人で大人 1 人分の乗員として計算される。 とはいえ、現実的にはチャイルドシートやジュニアシートを使うので大人が座るのと同じ面積を占有する。 また、基本的にチャイルドシートやジュニアシートは助手席に設置することがエアバッグの関係から推奨されていない。 そのため、どの席に家族の誰がどのような形で乗るかは事前に想定しておく必要がある。

もし一度に 6 人以上を乗せたい場合は、一般的に車が 3 列目シートという装備を必要とある。 これは、一般的な乗用車であれば前席 (1列目) と後席 (2列目) だけのところに、さらに後ろに席 (3列目) があるというもの。 この 3 列目シートを備える車種は意外と少ない。 なお、3 列目シートの席は車種によっては次のような制約がある点に注意が必要となる。

  • チャイルドシートの設置が難しい場合がある
  • 他の席に比べて乗り心地に劣る場合がある

また、一定の人数が乗った状態で荷物がどれくらい乗せられるかも気をつけるべきポイントになる。 特に子どもが小さいときはベビーカーを積む必要があったり、お世話に使う荷物がかさばる傾向にある。 旅行など長距離・長時間の移動においては、特に荷物の積載量が車のボトルネックになりやすい。

子どものお世話のしやすさ

車内において食事 (授乳) や着替え (おむつ交換) など、子どものお世話をする場面がある。 車種によっては、そういった場面を想定して意図的にお世話をしやすく作られている。

代表的なものを挙げると、車内を歩いて移動できるようにしたもの。 これは機能的にはウォークスルーと呼ばれる。 雨が降っているタイミングや、運転手を交代するような場面で使いやすい。 車種によっては 1 列目から 3 列目までウォークスルーできる。

もちろん、単純に室内空間が広かったり天井が高い車種は車内でお世話がしやすい。 この点は、後席がびっくりするほど狭い車種もある 1 ので注意が必要になる。

他にも、細かい所でいうと食事や飲み物を置くためのシートバックテーブルの有無とか。 子どもは暑がりなので、後席にエアコンの送風口やサーキュレーターがあると夏場も涼しく過ごせる。 ただし、ここらへんはあらかじめ車についていなくても、ある程度は自分で工夫して後付けなどをする余地がある。

なお、室内空間が広かったり天井の高い車種のデメリットとしては、重心の高さなどから走行性能の妥協につながりやすい傾向がある。

安全性について

大切な家族を乗せるからには、安全性も気になるポイントのはず。 ここでは、なるべく定量的に評価している資料を参照したい。

まずは独立行政法人 自動車事故対策機構 (NASVA) が実施している自動車アセスメント (JNCAP) がある。 車の安全性は、一般的に予防安全性と衝突安全性に分けて考えられる。 NASVA の自動車アセスメントでは、国内で販売されている主要な車種について予防安全性と衝突安全性を評価している。 国内でのみ販売されている車種については、衝突安全性の評価に関するほとんど唯一の情報源になる 2

www.nasva.go.jp

一点、衝突安全性の結果を解釈する上で注意すべき点を挙げておく。 それは、前面衝突の評価が「車両単独事故」に相当する条件になっていること。 車両を固定された障害物にぶつけるので、衝突時に車両へかかる運動エネルギーは車両の質量に応じて異なる。 つまり、軽い車両ほど衝突時の運動エネルギーは小さい状況で評価されている。 一方で、側面衝突の評価では止まっている車両に動く車台をぶつけるので車両相互事故に相当する条件になる。 なお、評価項目や内容は定期的に更新されるので、ずっと同じわけではない。

次に、乗用車の種類と死亡事故の相関については公益財団法人 交通事故総合分析センター (ITARDA) の資料がある。 少し古いものの 1999 年に発行された ITARDA INFORMATION No.20 を参照する。

https://www.itarda.or.jp/contents/518/info20.pdfwww.itarda.or.jp

以下は上記の資料から抜粋したもの。 平成 8 年の「車両相互事故」における運転者 1,000 人あたりの死者数を示す。 わずかながら小さい自動車の方が死亡事故が起こりやすい傾向が確認できる。

種類 ベルト着用 ベルト非着用
軽乗用車 1.1 12.0
小型乗用車 0.6 7.0
普通乗用車 0.4 5.1

なお「車両単独事故」では、むしろ登録車の方が死亡事故が起こりやすい傾向を示している。

種類 ベルト着用 ベルト非着用
軽乗用車 19.2 34.4
小型乗用車 24.8 61.3
普通乗用車 23.5 96.8

資料ではその理由として、普通乗用車は事故直前に出していたスピード (危険認知速度) が速いためと述べられている。 たしかに危険認知速度の累積カーブを見ると軽自動車・小型自動車・普通自動車の順でスピードが速い傾向にある。 運動エネルギーは速度に関して二乗で大きくなるため、車のサイズに関わらずスピードを出すことは危ないと言える。

また、2020 年発行の No.133 では、軽自動車における AEB (衝突被害軽減ブレーキ) についての分析がある。 AEB は先進運転支援システム (ADAS) の一種で、衝突のリスクが高い場合に自動でブレーキをかけてくれる機能のこと。 予防安全性を高める効果や、衝突した際の被害を軽減することが期待できる。

https://www.itarda.or.jp/contents/8685/info133.pdfwww.itarda.or.jp

上記の分析では、軽自動車において AEB が事故の発生率を減少させる効果が示されている。 ここ数年における ADAS の進歩は目覚ましいことから、新しいシステムを搭載した車両を利用するモチベーションにつながる。 ちなみに運転が苦手なときは、今だと駐車を自動でやってくれる車両なんてのもある。

なお、安全性のトレードオフについても言及しておく。 まず、衝突安全性を重視すると室内空間の広さに制約を受けたり、重量が増して燃費に悪影響を与えることが考えられる。 予防安全性については、より高性能な SoC を必要とすることやセンサーの種類や数が増えることで開発・製造のコストに影響してくる。

チャイルドシートの安全性

ちなみに前述の NASVA は国内で販売されている主要なチャイルドシートの安全性や使い勝手に関するアセスメントも実施している。 一見するとどれも大した違いがないように思われるチャイルドシートも、事故が起こった際の安全性に違いがある。

www.nasva.go.jp

また、チャイルドシートは車両に固定する方法として主に以下の 2 種類がある。

  • シートベルトを利用するもの
  • 国際標準規格の ISOFIX という取付金具を利用するもの

特別な事情が無い限りは、ISOFIX のタイプを選ぶことで確実な固定を得るのがおすすめ。 日本では 2012 年以降に販売される新車に ISOFIX の設置が義務付けられている。 なお、すべての座席に装備されているわけではない点に留意が必要となる。

生涯費用について

続いて、車の所有とは切っても切り離せない関係にあるのがお金の話。 車は購入するのにお金がかかるだけでなく、維持するのにもお金がかかる。 そして、手放すときには大抵の場合にいくらかのお金が戻ってくる。 そこで、これらを合算した金額を生涯費用と捉える。

 生涯費用 = 購入金額 + 維持費 - 売却金額

購入金額と売却金額について

おそらく大抵の人にとって、自動車の購入金額はなるべく抑えたいはず。 ただし、単純に購入金額を抑えれば生涯費用も抑えられるとは限らない。 ここでは、その背景を説明する。

まず、乗用車は中古市場が発達している。 特にコロナ禍以降は、いくつかの理由で中古車の人気が高まっている。 これは、自身が所有している車を業者に売却するときに値段がつきやすいことを意味する。

中古車の値段は、市場の需給バランスで決まる傾向にある。 つまり、需要 (人気) があって、市場への供給が少ない車種ほど値段が高くなる。 たとえば、人気のない車種を選んで購入すると売却金額は低くなりやすい。 そういった事情もあって、購入金額を抑えることが必ずしも長い目で見たとき経済的に合理的とは限らない。 この、車を売却するときの値段の良し悪しは「リセールが良い・悪い (あるいは高い・低い)」と表現する。

中古車の価格を決めるパラメータは多岐にわたる。 車種以外で最も基本的なものとしては「年式」と「走行距離」がある。 乗用車は製造から年数が経つほど、そしてたくさん走るほど価値が落ちていく。 「メーカー」についても、一般的に海外メーカーは国内メーカーに比べて価値の下落が大きい傾向にある。 これは、経年を経たときの故障しやすさが一因として考えられる。

また、乗用車は一般的に数年から十数年ごとにフルモデルチェンジする。 フルモデルチェンジでは、車の見た目や中身がごそっと入れ替わる。 そうなると、型落ちになったモデルは通常であれば人気が落ちるので中古車も安くなる。

上記のような事情から、可能であれば自動車を購入する時点で、ある程度の出口戦略を考えておけると良い。 要するに、購入してから何年程度乗って、手放すときは何万円程度で売れそうかをあらかじめ想定しておく。 もちろんこれは単なる皮算用に過ぎないけど、何もないよりは車に使うお金の計画を立てやすくなる。

その車の実際の生涯費用は、乗り終わって手放したタイミングで確定する。 あとは乗っていた期間で均せば、その車を乗るのに月額でいくらかかっていたかが分かる。

維持費について

車はただ所有しているだけで、乗る乗らないに関わらず維持費がかかる。 ここでは、どういった費用がかかるのか代表的なものを挙げていく。

駐車場代

まず、駐車場がかからない家に住んでいない限りは駐車場代が毎月発生する。 住んでいる土地の値段が高ければ、駐車場の料金もそれに比例して高くなる。

自動車税 / 軽自動車税

1 年に 1 回、車を保有していると税金を支払う必要がある。 登録車は排気量に応じて概ね 3 ~ 4 万円ほどかかる。 軽自動車なら 1 万円で済む。

車検

車は 2 年ごと (新車のみ 3 年) に車検を受けることが義務付けられている。 車検では、その車が保安基準に適合しているかどうか検査される。

車検で必要になる費用は大きく分けて以下の 3 つがある。

  • 法定費用
    • 自動車重量税
    • 自賠責保険料 (強制保険)
    • 印紙代
  • 検査費用・手数料など
  • 整備費用

法定費用に関しては何処でやってもほとんど差がない。 一方で、検査費用・手数料と整備費用についてはお願いする業者によって変わってくる。

車検には法定費用があるため、検査費用や整備費用を安く抑えても数万円程度は必ずかかる。 また、経年や走行距離が増えるごとに、保安基準に適合させるのに要する整備費用が増える傾向にある。

任意保険

一般的に、自賠責保険 (強制保険) でカバーしきれない部分を補うために任意保険に加入する。 車種や加入する内容に依存するが、これには 1 年間で数万円程度はかかる。

消耗品類

自動車には走行や経年劣化によって定期的に交換を必要とする部品がある。 以下に主だったものを挙げてみる。 それぞれ、交換が必要になったタイミングで数千円から数万円がかかる。

  • エンジンオイル・オイルフィルター
  • エアコンフィルター
  • ワイパーゴム
  • 補機用バッテリー
  • タイヤ

燃料代・高速道路料金など

これについては言わずもがなで、乗れば乗っただけお金がかかる。

駆動方式について

自動車の駆動方式についても取り上げておく。 色々あって意外とややこしい。 車にこだわりがない場合、動けば何でも良いと思えるかもしれない。 しかしながら、駆動方式は前述した生涯費用に影響するため無視はできない。 なお、現在は特別な事情がない限り基本的にエンジン車かハイブリッド車の 2 種類から選ぶことになる。

駆動方式のスペックに登場する数値に関しては、ざっくり以下のようにイメージしておくと良い。 自宅の近くに坂道が多かったり、高速道路を使う機会があるなら意識した方が良いかも。

  • 出力 (馬力)
    • 平坦な道を速く走る能力
  • トルク
    • 坂道を速く登る能力

エンジン車

エンジン車は、ガソリンや軽油といった燃料を燃やす内燃機関だけで動く自動車のこと。 ハイブリッド車に比べると車体価格が抑えられる一方で燃費や走行性能が劣る傾向にある。

車種によってはエンジンの排気量 (燃焼室の数と大きさの積) や過給器の有無を選べる。 過給器は主にターボチャージャーで、通常はターボと略される。 過給器がついていると、仮想的な排気量を増やす効果がある。 特に、排気量の制約が強い軽自動車の場合は有無を選べることが多い。 過給器がつくと、一般的に出力とトルクが増して、燃費がわずかに落ちる傾向にある。

ハイブリッド車 (HV)

エンジンに加えてモーターで駆動する力も使う自動車をハイブリッド車と呼ぶ。 エンジン車に比べると燃費が良く、静粛性や走行性能に優れる傾向にある。

なお、燃費の差で車両価格の差額分の元を取るには長い距離 (典型的には 10 万キロなど) を乗る必要がある。 そのため、元を取るというよりはその他の観点 (静粛性、スペック、リセール、給油間隔など) で選ぶ方が現実的に感じる。

参考までに、元を取るまでの距離を求める方法を示す。 まず、使用するパラメータは次のとおり。

  • ハイブリッド車の車両価格 (円)
    •  C_h
  • エンジン車の車両価格 (円)
    •  C_g
  • ハイブリッド車の燃費 (km/L)
    •  F_h
  • エンジン車の燃費 (km/L)
    •  F_g
  • 燃料費 (円/L)
    •  P

まずは車両の価格差  \Delta C (円) を求める。

 \Delta C = C_h - C_g

1km あたりの燃料費の差  \Delta R (円/km) を求める。

 \Delta R = \frac{P}{F_h} - \frac{P}{F_g}

元が取れる走行距離  D (km) を求める

 D = \frac{\Delta C}{\Delta R}

ちなみに、ハイブリッド車にはシステムの電圧が低い「マイルドハイブリッド」と電圧が高い「ストロングハイブリッド」の 2 種類がある。 いずれにしても、エンジンとは別にモーターや駆動用バッテリーが必要になるためエンジン車に比べると価格は高い。

マイルドハイブリッドは、モーターの出力やトルクが低い。 そのため、発進時などエンジンが苦手とする場面をモーターでアシストする側面が強い。 価格やサイズの制約が強い軽自動車であってもマイルドハイブリッドならラインナップがある。 燃費を改善する効果は相対的に小さい (エンジン車に比べて 115% など)。

ストロングハイブリッドは、モーターの出力やトルクが高い。 そのため、エンジンは発電に徹してモーターのみで駆動するようなシステム構成 (シリーズ型ハイブリッド) すらある。 燃費を改善する効果は相対的に大きい (エンジン車に比べて 150% など)。

また、バッテリーのエネルギーだけで走れる距離を数十キロメートル程度まで伸ばしたプラグインハイブリッド (PHV) という派生形もある。 これはハイブリッドと電気自動車の中間のような存在で、車両価格はストロングハイブリッドよりもさらに高い。

電気自動車 (BEV)

バッテリーに貯めた電力とモーターのみで駆動する自動車を電気自動車と呼ぶ。 現状では内燃機関を積んだ車に対するメリットはさほどない割にデメリットは多い。

  • エネルギーの補充 (充電) に時間がかかる
  • エネルギーを補充できる場所が限られる
  • バッテリーが劣化すると航続距離が短くなる
  • 車両価格が高い
  • リセールが低い

何らかのこだわりがある場合を除いて、今のところ有効な選択肢となる場面は限られる。

燃料電池車 (FCV)

燃料 (一般的に水素) を電気化学反応させて取り出した電気でモーターを動かす自動車を燃料電池車と呼ぶ。 電気自動車と同様に、現時点で積極的に選ぶ理由には乏しい。

車種の具体例について

ここまでは選定基準などを挙げてきた。 具体的な話もした方が良いと思うので、以下の基準で車種をいくつか挙げてみる。

  • パワースライドドアを装備している
  • 室内空間が広い
  • 販売台数が多い
  • 国内メーカーが製造している

なお、新車の販売台数に関する統計情報は以下の Web サイトで確認できる。

こちらが軽自動車。

www.zenkeijikyo.or.jp

こちらが登録車。

www.jada.or.jp

以降は車種を挙げる際に、この記事を書いている時点における直近の販売台数で降順ソートしている。

軽自動車

軽自動車は車両価格が安くて維持費も低い。 乗員数が 4 名であることや、荷物がそんなに載らないことを許容できれば良い選択肢になるはず。 サイズが小さくて運転もしやすいので、最初の一台として選びやすい。

以下は軽のスーパーハイトワゴンと呼ばれるタイプで販売台数の多いモデルになる。 軽自動車の規格をギリギリまで使った上で、大きな室内空間を実現している。

  • ホンダ N-BOX
  • スズキ スペーシア
  • ダイハツ タント

5人乗り

一般的な乗用車において 5 人乗りは車種の選択肢が最も多い。 しかしながら、前述の条件を満たすものは意外と限られる。

以下は小型のハイトワゴンと呼ばれるタイプの車になる。 前述した軽のスーパーハイトワゴンを、そのまま幅と長さについて大きくしたイメージに近い。

  • トヨタ ルーミー
  • スズキ ソリオ

なお、トヨタ ルーミーに関してはダイハツ トールの OEM モデルになっている。 これは、別のメーカーが開発した車種を自社のブランドで売るというもの。

6人以上

6 人以上が乗れる乗用車は基本的には 3 列目シートを備えることになる。 次のようなニーズで行き着きやすい。

  • 子どもが2人以上いる
  • 知人や両親を乗せる機会がある
  • 荷物をたくさん載せたい

まずはコンパクトミニバンや S サイズミニバンと呼ばれるタイプ。 一般的な乗用車と大して変わらないサイズで 3 列目シートを備えている。 さすがに 3 列目は狭いので、長時間の利用はつらい感じ。 普段は 3 列目の席を折りたたんでおいて、いざという時だけ使うイメージ。

  • トヨタ シエンタ
  • ホンダ フリード

もう少し大きな、ミニバンや M サイズミニバンと呼ばれるタイプ。 これくらいのサイズになると 3 列目の席にも余裕があるので常用しやすい。 子どもが 3 人以上いるような家庭ではかなりの確率で選ばれる。

  • トヨタ ノア・ヴォクシー
  • ホンダ ステップワゴン
  • 日産 セレナ

なお、上記よりもさらに大きな L サイズミニバンと呼ばれるタイプもある。 とはいえ、この記事の想定読者が最初に検討するような車ではないように思う。

購入までの流れ

ここからは、実際に購入するまでの注意点などを挙げていく。

新車の場合

新車を購入する場合には、基本的にディーラー (販売代理店) へ行くことになる。

ディーラーに行くと待ち時間が多く入るので、滞在時間として半日程度は見積もっておいた方が良い。 あるいは用事があるなどして手早く済ませたい場合には、あらかじめ滞在時間について何時までと事前に伝える。 子どもが同伴するならキッズスペースの有無などを事前に確認しておくと良い。 また、店舗によって用意されている展示車・試乗車の種類や数は異なる。 そのため、確認したい車が揃っている店舗を事前に Web サイトで調べた上で予約を入れておくと間違いがない。

現地では、担当者からヒアリングなどを受けながら展示車の確認や試乗車の試乗を進める。 ちなみに、ディーラーで試乗したり実物を見学できる正味の時間は意外と短い。 試乗に関しては、基本的にはあらかじめ決まったコースをぐるっと数分走るだけ。 この点は、あらかじめこういった道を走りたいなどの希望があれば担当者に伝えることで多少の融通が効く。 とはいえ、高速道路を試乗したいといった要望は、おそらく実現が難しい場合が多いはず。 もし、長い時間をかけて色々な状況で確認したい場合には、ディーラーではなく自分でレンタカーを借りるなどした方が良い。

そして、試乗などが一通り終わると基本的には見積もりを作ってもらうことになる。 見積もりを作る際には、以下のような項目を担当者と話し合いながら決めていく。 なお、見積もり価格の感覚を得るために、事前にメーカーの Web サイトでセルフ見積もりはやっておいた方が良い。

  • 車種
  • グレード
  • 色 (外装・内装)
  • オプション
  • 支払い方法

見積もりが出たら、あとは帰るなり、見積もりを元にさらに商談を進めるなりの選択肢がある。 とはいえ、いきなり商談を進めるよりは、ひとまず一旦は帰って冷静に検討することをおすすめしたい。 商談を進めるにしても、少なくとも一回目の見積もりで購入することは、次のような理由から避けた方が良い。

  • 必要ないオプションが入っている可能性が高い
  • 十分な値引きが入っていない可能性が高い

帰る前には、営業活動に対する自分のスタンスを担当者へ明確に伝えておいた方が良い。 具体的には、ディーラーからの自発的な営業活動を受け入れるかどうか。 もし受け入れるなら、どのようなチャネル (電話、郵便など) を受けるか。 また、連絡しても良い時間帯などについても伝えておいた方が良い。 この点を明確に伝えていなかったとき、アポなしで自宅に訪問を受けるようなパターンも実際に経験したことがある。 もし、自身の意思にそぐわないような営業活動を受けるときは取引しないことをおすすめする。

ちなみに、ディーラーは名前が違えば基本的には違う会社が経営している。 また、メーカーとディーラーの間に直接の資本関係がないことも多い。 これは、メーカーとはまったく別の企業が販売代理店の契約を結んでそのメーカーの車を売っているパターン。 ちなみに経営が別の会社であれば、同じ車種であっても相見積もりを取って個別の商談が可能になる。

新車の場合、値引きが期待できる余地は中古車に比べると大きい。 値引きできる相場は車種や市況によって異なるため、あらかじめ調べた方が良い。

中古車の場合

続いては中古車を購入する場合について。

中古車は出物次第なので、基本的には中古車情報を扱っている Web サイトで条件に一致するものを探す場合が多い。 販売されている店舗の場所の兼ね合いもあるので、検索する際は直接訪問できる範囲に絞った方が良い。 気に入ったものがあれば、基本的には現地に赴いて状態の確認や試乗をした上で購入する。

中古車は一定期間で故障した際に保証される範囲が新車に比べると限られる傾向にある。 そういったリスクが気になるときはメーカーの認定中古車を選ぶと良い。 これには、次のようなメリットが考えられる。

  • 整備が行き届いた状態で販売される
  • メーカーによる認定プロセスがある
  • 保証期間が長い (1 年など)

デメリットとしては、その分だけ値段は高くなる。

それ以外だと、次のような策が考えられる。

  • 第三者機関による鑑定書がついた車両を選ぶ
  • 信頼できる販売店があるならそこで購入する
  • 料金を支払って追加の保証をつける

中古車の場合、値引きが期待できる余地は新車に比べると小さい。 これは、それぞれの車両ごとに査定に応じた値付けがされているため。 中古だとまったく同じ状態の商品は 2 つとして無いので、基本的には相場に応じた金額がつけられる。 そのため、現金での値引きを期待するよりは何かをオマケで付けてもらったりするイメージがある。

オプションについて

車には、車両本体とは別にオプションと呼ばれる装備を有料でつけることができる。 オプションには、メーカーオプションとディーラーオプションの 2 種類がある。

前述した通り、ディーラーでの一回目の見積もり内容には大きな利幅を期待できるオプションがいつの間にか含まれていることが多い。 具体的には、メンテナンスパックやガラスコーティングなど。 調べた上で、本当に必要と感じない限りは見積もりから外してもらった方が良い。

なお、オプションはたくさん付けてもリセールの金額には影響しにくい。 プラスに影響するものは、その車種で人気のある一部の装備に限られる。 具体例を挙げると大型のモニターを備えたカーナビゲーションや後席モニター、あるいはサンルーフなど。

メーカーオプション

メーカーオプションは、車両を製造するタイミングで装着される。 そのため、購入した後に追加で装着することは基本的にできない。

装着できるオプションの具体的な内容は、車種やグレードによって異なる。 例を挙げると全周囲カメラやディスプレイオーディオなどがラインナップされることがある。

ディーラーオプション

ディーラーオプションは、任意のタイミングで付け外しできる。 そのため、必要になったら後で追加すれば良い。 中古車であっても、ディーラーなどに持ち込んで装着してもらうことができる。 ただし、新車で購入するタイミングでつけると、金額に応じた一定の値引きが期待できる。

何をオプションにラインナップするかは、メーカーの方針によるところが大きい。 代表的なものをいくつか挙げてみる。

  • ドアバイザー
  • フロアマット
  • カーナビゲーション
  • ドライブレコーダー

ちなみに、ディーラーオプションになっている装備は、メーカーの純正ではない「社外品」が存在することも多い。 一般的に社外品は純正品よりも値段が安く、電子機器 (ドライブレコーダーなど) は純正品よりも高性能・多機能な場合が多い。 ただし、取り付けの手間や工賃が別にかかるなど、利用する上で何かしらのトレードオフは必ずある。

グレードについて

同じ名前の車種であっても、いくつかのグレードがラインナップされることが多い。 典型的には、それぞれのグレードで次のような点に違いが設けられている。

  • 排気量
  • 駆動方式
  • 内外装
  • メーカーオプション

注意点としては、人気の集まりやすいグレードがあること。 前述した通り、リセールは需給バランスに応じて決まりやすい。 そのため、グレードによる人気の違いがリセールに影響することも頭の片隅に置いた方が良い。

たとえば軽自動車にはクール系やカスタム系と呼ばれる主に内外装を変えたグレードがよく用意される。 一般的にノーマルのグレードより価格は高いが、差額をリセールで相殺できる可能性がある。

内外装の色について

外装色は複数の色から選べることが多い。 また、車種やグレードによっては内装色も選べることがある。 もちろん好きな色を選んだら良いんだけど、選ぶ上で知っておいた方が良さそうな話もある。

まず、色はリセールに影響する。 車種やグレードにも依存するものの、一般的には外装色は白と黒のリセールが高い場合が多い。 これは、人気のある色に偏りがあることが一因として考えられる。 影響する度合いとしては、人気の有無で 5 ~ 20 万円程度は売却するときの値段が変わってくるイメージ。

また、色はキズや汚れの目立ち方に影響する。 外装色の白と黒であれば、白の方がキズや汚れは目立ちにくいと言われている。 一方で、内装色に関しては黒の方がキズや汚れは目立ちにくいと言われている。

最後に、外装色は夜間における他車や歩行者からの視認性に影響する。 理由は単純で、明るい色の方が光を反射するので暗い場所でも目立ちやすい。 定量的な資料は持ち合わせていないけど、他車や歩行者からの視認性は地味に安全性に関わるポイントになる。

支払い方法について

車を買うときは、その支払い方法にいくつか選択肢がある。

まず一番分かりやすいのが現金払い。 これは、特定の期日までに現金を販売店に持参するか、あるいは銀行振込する。 一度にまとまった金額の現金は必要になるが、支払ってしまえば後は気にすることが何も無い。

次に、自動車ローンを組むやり方。 これは、自動車を買うための資金を何処からか借りて販売店に支払う。 そして、借りたところへ 5 ~ 10 年程度かけてお金を返していく。 自動車ローンを扱っている業者はディーラーや銀行など色々とある。 一般的には銀行などの方が金利が低い。 ローンによってはお金を返し終わるまで車の所有者が自身にならないことがある。 一度にまとまった金額を用意する必要がないので、金利が低ければその分を別の運用に回すなども考えられる。

そして、残価設定型クレジットを利用するやり方。 これは、あらかじめ契約で決まった期間について、特定の金額を支払って車を使用する。 そして、契約期間が満了したタイミングで「買い取り」「乗り換え」「返却」を選ぶ。 返却や乗り換えを選ぶ場合には、基本的に契約で大きなキズのないことや月あたりの走行距離などに一定の条件がある。 条件を満たさない場合には追加でお金を支払う必要がある。 所有者が自身にならないことが多いなど、制度的にはカーリースに近い。 この支払い方法は、ディーラーにとって期待できる利益が大きいことから積極的にすすめられることがある。

まとめ

ここまで、車に興味の無かった人がファミリーカーを検討する上で知っておいた方が良さそうなことを書き連ねてみた。 なお、冒頭に書いた「車に興味の無かった人」というのは、これを書いた自分自身のことを指している。 そのため、自分があらかじめ知っておきたかったと思える内容をできるだけ書いたつもり。 記事の中に誤りを見つけた際にはどうかやさしく教えてほしい。


  1. 具体例としてはトヨタのヤリスシリーズなど
  2. 海外でも販売している車種であればヨーロッパの Euro NCAP やアメリカの US-NCAP も参考にできる

Python: PyTorch で Adagrad を実装してみる

今回は、以下の記事の続きとして PyTorch で Adagrad を実装したオプティマイザを自作してみる。 以下の記事では単純な SGD と Momentum を導入した SGD を実装した。

blog.amedama.jp

今回扱う Adagrad のアルゴリズムではパラメータごとに学習率を自動で調整する。 これによって値の収束が早まったり、あるいは過剰に値が更新されることを防ぐことができる。 ただし、学習率の調整に過去の勾配の平方和だけを使っていることから、徐々に学習が進みにくくなってしまう問題がある。 この問題を解決するために RMSProp が提案された。

使った環境は次のとおり。

$ sw_vers
ProductName:        macOS
ProductVersion:     14.6.1
BuildVersion:       23G93
$ python -V                   
Python 3.11.9
$ pip list | egrep -i "(torch|matplotlib)"
matplotlib        3.9.2
torch             2.4.1

もくじ

下準備

下準備として PyTorch と Matplotlib をインストールしておく。

$ pip install torch matplotlib

PyTorch 組み込みの Adagrad を試す

まずは PyTorch 組み込みの Adagrad の動作を確認する。 扱う問題は先に示した記事と同じもの。 この問題設定や初期値などは「ゼロから作るDeep Learning 1」に記載されている内容と同一にしている。

import torch
from matplotlib import pyplot as plt
from torch import nn
from torch import optim


class ExampleFunction(nn.Module):
    """最適化したい関数: f(x, y) = ax^2 + by^2"""

    def __init__(self, a, x, b, y):
        super(ExampleFunction, self).__init__()
        self.a = a
        self.x = nn.Parameter(torch.tensor([x]))
        self.b = b
        self.y = nn.Parameter(torch.tensor([y]))

    def forward(self):
        # f(x, y) = ax^2 + by^2
        return self.a * self.x**2 + self.b * self.y**2


def main():
    # 初期値を指定して最適化したいモデルをインスタンス化する
    model = ExampleFunction(a=1 / 20, x=-7.0, b=1.0, y=2.0)
    # Adagrad で最適化する
    optimizer = optim.Adagrad(model.parameters(), lr=1.5)

    # パラメータの軌跡を残す
    trajectory_x = [model.x.detach().numpy()[0]]
    trajectory_y = [model.y.detach().numpy()[0]]

    # 最適化のループを 30 回にわたって回す
    num_epochs = 30
    for epoch in range(1, num_epochs + 1):
        # 勾配を初期化する
        optimizer.zero_grad()

        # モデルの出力を得る
        outputs = model()

        # 誤差逆伝播
        # 本来は損失関数を元に勾配を求める
        # 今回はパラメータがゼロへ近づくように最適化する
        outputs.backward()

        # パラメータを更新する
        optimizer.step()

        # 更新されたパラメータを記録する
        x = model.x.detach().numpy()[0]
        trajectory_x.append(x)
        y = model.y.detach().numpy()[0]
        trajectory_y.append(y)

    # パラメータのたどった軌跡を可視化する
    fig, ax = plt.subplots(1, 1, figsize=(8, 6))
    ax.plot(trajectory_x, trajectory_y, marker="o", markersize=5, label="Trajectory")
    ax.legend()
    ax.grid(True)
    ax.set_xlabel("x")
    ax.set_ylabel("y")
    plt.show()


if __name__ == "__main__":
    main()

上記に適当な名前をつけて実行する。

$ python adagrad.py

すると、次のようなグラフが得られる。 これは、パラメータが更新されていく様子を表している。

Adagrad で最適化したパラメータの軌跡

Adagrad のアルゴリズムを実装する

続いては Adagrad のオプティマイザを自作してみよう。 早速だけどサンプルコードを以下に示す。 サンプルコードでは CustomAdagrad という名前でオプティマイザを実装している。 オプティマイザを実装する際の流儀や API については先に示した記事を参照してもらいたい。

from collections.abc import Iterable
from typing import Any

import torch
from matplotlib import pyplot as plt
from torch import nn
from torch.optim import Optimizer


class ExampleFunction(nn.Module):

    def __init__(self, a, x, b, y):
        super(ExampleFunction, self).__init__()
        self.a = a
        self.x = nn.Parameter(torch.tensor([x]))
        self.b = b
        self.y = nn.Parameter(torch.tensor([y]))

    def forward(self):
        return self.a * self.x**2 + self.b * self.y**2


class CustomAdagrad(Optimizer):
    """自作した Adagrad のオプティマイザ"""

    def __init__(self, params: Iterable, lr: float = 1e-3, eps=1e-10):
        defaults: dict[str, Any] = dict(
            lr=lr,
            eps=eps,
        )
        super(CustomAdagrad, self).__init__(params, defaults)

    def step(self, closure=None):
        """Adagrad の更新式を実装した step() メソッド

        (更新式)
        v_0 = 0
        v_{t+1} = v_t + grad(L(theta_t))^2
        theta_{t+1} = theta_t - eta / (sqrt(v_{t+1}) + eps) * grad(L(theta_t))

        theta: パラメータ (重み)
        eta: 学習率
        grad(L(theta)): 損失関数の勾配
        v: 過去の勾配の平方和
        eps: ゼロ除算を防ぐための小さな値
        """
        for group in self.param_groups:
            for param in group["params"]:
                if param.grad is None:
                    continue
                # v_0 = 0 に対応する
                if "v" not in self.state[param]:
                    self.state[param]["v"] = torch.zeros_like(param.data)
                # v_{t+1} = v_t + grad(L(theta_t))^2 に対応する
                self.state[param]["v"] += param.grad * param.grad
                # theta_{t+1} = theta_t - eta / (sqrt(v_{t+1}) + eps) * grad(L(theta_t)) に対応する
                param.data -= group["lr"] / (torch.sqrt(self.state[param]["v"]) + group["eps"]) * param.grad


def main():
    model = ExampleFunction(a=1 / 20, x=-7.0, b=1.0, y=2.0)

    # 自作したオプティマイザを使う
    optimizer = CustomAdagrad(model.parameters(), lr=1.5)

    trajectory_x = [model.x.detach().numpy()[0]]
    trajectory_y = [model.y.detach().numpy()[0]]

    num_epochs = 30
    for epoch in range(1, num_epochs + 1):
        optimizer.zero_grad()

        outputs = model()

        outputs.backward()

        optimizer.step()

        x = model.x.detach().numpy()[0]
        trajectory_x.append(x)
        y = model.y.detach().numpy()[0]
        trajectory_y.append(y)

    fig, ax = plt.subplots(1, 1, figsize=(8, 6))
    ax.plot(trajectory_x, trajectory_y, marker="o", markersize=5, label="Trajectory")
    ax.legend()
    ax.grid(True)
    ax.set_xlabel("x")
    ax.set_ylabel("y")
    plt.show()


if __name__ == "__main__":
    main()

上記に適当な名前をつけて実行してみよう。

$ python customadagrad.py

すると、以下のグラフが得られる。

自作した Adagrad で最適化したパラメータの軌跡

上記から PyTorch 組み込みの実装と振る舞いが一致していることが確認できる。

更新式とコードの対応関係

ここからは更新式とコードについて見ていく。 まず、Adagrad の更新式は以下のようになっている。

 \displaystyle
v_0 = 0 \\
v_{t+1} = v_t + \nabla_{\theta} L(\theta_t)^2 \\
\theta_{t+1} = \theta_t - \eta \frac{1}{\sqrt{v_{t+1} + \epsilon}} \nabla_{\theta} L(\theta_t)

数式と、プログラムの変数の対応関係は次のとおり。

  •  \theta
    • param.data
  •  \eta
    • group["lr"]
  •  \nabla_{\theta} L(\theta)
    • param.grad
  •  v
    • self.state[param]["v"]
  •  \epsilon
    • group["eps"]

式から、 v には勾配の平方和が蓄積されていくことが分かる。 そして、学習率と勾配から更新量を求める部分に  \frac{1}{\sqrt{v_{t+1} + \epsilon}} として挟まっていることが分かる。 これによって、過去の勾配を元にパラメータの更新される大きさが決まるようになっている。 逆数なので、分母が大きくなるほど更新される量は減っていく。 そして、 v は単純に増加していくので、イテレーションが進むごとに更新される量が減ることが分かる。  v はパラメータごとにあるので、過去に大きな勾配が求められた (= すでに大きく更新された) ものはなるべく更新されないように振る舞う。

コードとの対応関係を見ていこう。 まずは、以下の更新式に対応するコードから。

 \displaystyle
v_0 = 0

ここでは  v がまだない状態、つまり初期状態のとき変数をゼロで初期化している。

                # v_0 = 0 に対応する
                if "v" not in self.state[param]:
                    self.state[param]["v"] = torch.zeros_like(param.data)

続いては以下の更新式に対応するコード。

 \displaystyle
v_{t+1} = v_t + \nabla_{\theta} L(\theta_t)^2

ここでは求められたパラメータの勾配の二乗を、先ほど初期化した変数に足している。

                # v_{t+1} = v_t + grad(L(theta_t))^2 に対応する
                self.state[param]["v"] += param.grad * param.grad

最後に、以下の更新式に対応するコード。

 \displaystyle
\theta_{t+1} = \theta_t - \eta \frac{1}{\sqrt{v_{t+1} + \epsilon}} \nabla_{\theta} L(\theta_t)

ここでは先ほど求めた  v を使ってパラメータを更新している。

                # theta_{t+1} = theta_t - eta / (sqrt(v_{t+1}) + eps) * grad(L(theta_t)) に対応する
                param.data -= group["lr"] / (torch.sqrt(self.state[param]["v"]) + group["eps"]) * param.grad

いじょう。

参考

jmlr.org

arxiv.org

Python: PyTorch のオプティマイザを自作する

今回は、PyTorch でオプティマイザを自作する方法について紹介してみる。

きっかけは、勉強がてら主要なオプティマイザを自作してみようと思い至ったことだった。 その過程で、PyTorch でオプティマイザを自作する場合の流儀が把握できた。

そこで、この記事では以下のオプティマイザを書きながらその方法を説明してみる。

  • 単純な SGD (Stochastic Gradient Descent)
  • Momentum を導入した SGD

上記は最も古典的なオプティマイザだけど、実装することで基本的な機能を一通り紹介できるため。

使った環境は次のとおり。

$ sw_vers
ProductName:        macOS
ProductVersion:     14.6.1
BuildVersion:       23G93
$ python -V        
Python 3.11.9
$ pip list | grep -i torch
torch             2.4.0

もくじ

下準備

まずは PyTorch をインストールしておく。

$ pip install torch matplotlib

題材とする問題について

まず、オプティマイザを扱う以上は、最適化したい何らかの問題が必要になる。 今回の記事で題材とするのは f(x, y) = ax^2 + by^2 という関数にする。 関数には定数 a, b と変数 x, y が含まれる。 そして、これらの定数と変数を適当な値で初期化した上で、結果がゼロに近づくように最適化する。

上記の問題を、まずは PyTorch に組み込みで用意された SGD の実装を使って最適化してみよう。 要するに、まずはお手本となる結果を確認する。

サンプルコードは以下のとおり。 ExampleFunction というクラスが最適化したい関数を表している。 このクラスは nn.Module を継承しており、forward() メソッドで f(x, y) = ax^2 + by^2 に相当する順伝播を実装している。 コードでは、学習率 0.95 の SGD を使うことで、この関数の出力をゼロに近づけるようにパラメータを更新する。 また、更新のイテレーション回数は 30 回に決め打ちしている。 ちなみに、この問題設定や初期値などは「ゼロから作るDeep Learning 1」に記載されている内容と同一にしている。

import torch
from matplotlib import pyplot as plt
from torch import nn
from torch import optim


class ExampleFunction(nn.Module):
    """最適化したい関数: f(x, y) = ax^2 + by^2"""

    def __init__(self, a, x, b, y):
        super(ExampleFunction, self).__init__()
        self.a = a
        self.x = nn.Parameter(torch.tensor([x], dtype=torch.float32))
        self.b = b
        self.y = nn.Parameter(torch.tensor([y], dtype=torch.float32))

    def forward(self):
        # f(x, y) = ax^2 + by^2
        return self.a * self.x**2 + self.b * self.y**2


def main():
    # 初期値を指定して最適化したいモデルをインスタンス化する
    model = ExampleFunction(a=1 / 20, x=-7.0, b=1.0, y=2.0)
    # SGD で最適化する
    optimizer = optim.SGD(model.parameters(), lr=0.95)

    # パラメータの軌跡を残す
    trajectory_x = [model.x.detach().numpy()[0]]
    trajectory_y = [model.y.detach().numpy()[0]]

    # 最適化のループを 30 回にわたって回す
    num_epochs = 30
    for epoch in range(1, num_epochs + 1):
        # 勾配を初期化する
        optimizer.zero_grad()

        # モデルの出力を得る
        outputs = model()

        # 誤差逆伝搬
        # 本来は損失関数を元に勾配を求める
        # 今回はパラメータがゼロへ近づくように最適化する
        outputs.backward()

        # パラメータを更新する
        optimizer.step()

        # 更新されたパラメータを記録する
        x = model.x.detach().numpy()[0]
        trajectory_x.append(x)
        y = model.y.detach().numpy()[0]
        trajectory_y.append(y)

    # パラメータのたどった軌跡を可視化する
    fig, ax = plt.subplots(1, 1, figsize=(8, 6))
    ax.plot(trajectory_x, trajectory_y, marker="o", markersize=5, label="Trajectory")
    ax.legend()
    ax.grid(True)
    ax.set_xlabel("x")
    ax.set_ylabel("y")
    plt.show()


if __name__ == "__main__":
    main()

上記に適当な名前をつけて実行してみよう。

$ python sgd.py

すると、次のようなグラフが得られる。 これは最適化の過程でパラメータの xy が更新されていく軌跡を表している。

SGD で最適化したパラメータの軌跡

上記から、それぞれのパラメータが 0 に近づいていく様子が確認できる。

SGD のオプティマイザを自作する

続いては、今回の本題となるオプティマイザの自作に入る。 初めに目指すところは、PyTorch 組み込みの SGD と全く同じ結果が得られるオプティマイザを作ること。

早速だけどサンプルコードを以下に示す。 このコードでは CustomSGD という名前でオプティマイザを実装している。 以降は、この CustomSGD について順を追って説明していく。

from collections.abc import Iterable

import torch
from matplotlib import pyplot as plt
from torch import nn
from torch import optim
from torch.optim import Optimizer


class ExampleFunction(nn.Module):

    def __init__(self, a, x, b, y):
        super(ExampleFunction, self).__init__()
        self.a = a
        self.x = nn.Parameter(torch.tensor([x], dtype=torch.float32))
        self.b = b
        self.y = nn.Parameter(torch.tensor([y], dtype=torch.float32))

    def forward(self):
        return self.a * self.x**2 + self.b * self.y**2


class CustomSGD(Optimizer):
    """自作した SGD のオプティマイザ

    PyTorch でオプティマイザを自作する場合は torch.optim.Optimizer を継承する
    """

    def __init__(self, params: Iterable, lr: float = 1e-3):
        # 最適化したいパラメータと、動作に必要なハイパーパラメータをスーパークラスの __init__() に渡す
        defaults = dict(
            lr=lr,
        )
        super(CustomSGD, self).__init__(params, defaults)

    def step(self, closure: None = None) -> None:
        """SGD の更新式を実装した step() メソッド

        (SGD の更新式)
        theta_{t+1} = theta_t - eta_t * grad(L(theta_t))

        theta: パラメータ (重み)
        eta: 学習率
        grad(L(theta)): 損失関数の勾配
        """
        # 複数のパラメータが辞書形式で渡された際には param_groups に分割して入る
        for group in self.param_groups:
            # グループには最適化したいパラメータや、動作に必要な設定が辞書形式で入っている
            for param in group["params"]:
                # 各パラメータごとに処理していく
                if param.grad is None:
                    # パラメータの勾配が計算されていないものは更新しない
                    continue
                # group に格納された学習率 (lr) と勾配を使ってパラメータの値を更新する
                param.data -= group["lr"] * param.grad


def main():
    model = ExampleFunction(a=1 / 20, x=-7.0, b=1.0, y=2.0)
    # 自作した SGD で最適化する
    optimizer = CustomSGD(model.parameters(), lr=0.95)

    trajectory_x = [model.x.detach().numpy()[0]]
    trajectory_y = [model.y.detach().numpy()[0]]

    num_epochs = 30
    for epoch in range(1, num_epochs + 1):
        optimizer.zero_grad()

        outputs = model()

        outputs.backward()

        optimizer.step()

        x = model.x.detach().numpy()[0]
        trajectory_x.append(x)
        y = model.y.detach().numpy()[0]
        trajectory_y.append(y)

    fig, ax = plt.subplots(1, 1, figsize=(8, 6))
    ax.plot(trajectory_x, trajectory_y, marker="o", markersize=5, label="Trajectory")
    ax.legend()
    ax.grid(True)
    ax.set_xlabel("x")
    ax.set_ylabel("y")
    plt.show()


if __name__ == "__main__":
    main()

まず、PyTorch でオプティマイザを作る場合には基本的に torch.optim.Optimizer を継承したクラスを作る。 その上で、実装する必要があるメソッドは __init__()step() の 2 つある。

__init__() メソッドについて

__init__() では、オプティマイザを初期化する。 このとき、最適化したいモデルのパラメータ (重み) と動作に必要なハイパーパラメータを辞書の形式で引数としてスーパークラスの __init__() を呼び出す。 詳しくは後述するものの、こうすることで torch.optim.Optimizer で実装されているインスタンス変数などがセットアップされて利用できるようになる。

step() メソッドについて

step() メソッドは、最適化する対象のパラメータの勾配を計算した上でユーザのコードから呼び出される。 こちらに、具体的なパラメータを更新する処理を記述する。

ちなみに、ドキュメントやソースコードを確認すると、定義する上でメソッドのシグネチャは以下の 3 通りから選べるようになっている。

def step(self, closure: None = ...) -> None:
def step(self, closure: Callable[[], float]) -> float:
def step(self, closure: Optional[Callable[[], float]] = None) -> Optional[float]:

異なるシグネチャが存在する理由は、アルゴリズムによって引数の closure を利用するかが選べるため。 一番上のシグネチャでは closure をまったく使用しないパターン、真ん中が必ず使用するパターン、一番下があってもなくてもどちらも許容するパターンになっている。

この closure という引数には、Callable[[], float] というタイプヒントから分かるように引数なしの呼び出し可能オブジェクトが渡される。 これは最適化するモデルの損失を float 型で返すもので、オプティマイザ内で損失を評価しながら何度もパラメータを更新する場合に使用するらしい。 ただし、実際に closure を有効に使用しているアルゴリズムはごく限られている (LBFGS など) ことから、通常は一番上か一番下を選択すれば良い。 PyTorch が組み込みで実装しているオプティマイザの多くは一番下のシグネチャを選択しているようだ 2。 今回のサンプルコードではシンプルさを優先して一番上を採用した。

続いては step() メソッドの具体的な実装方法について解説していく。 前述したスーパークラスの __init__() に渡された引数は、グループ単位で Optimizer#param_groups というメンバ変数に登録される。 ここでいうグループというのは、一つの Optimizer で異なる複数の最適化を同時に実行する場合に用いられる処理のまとまりのこと。 以下のコードでは、グループをループで取り出しながら処理している。 ちなみに、通常であればここには一つの要素しか入らない。

       # 複数のパラメータが辞書形式で渡された際には param_groups に分割して入る
        for group in self.param_groups:

グループを取り出したら、そこに辞書形式でパラメータや動作に必要な設定が入っている。 たとえば最適化の対象になるパラメータは "params" というキーで得られる。 以下のコードでは各パラメータを取り出して for ループでそれぞれ処理している。 ここでいうパラメータというのは、今回のタスクであれば xy に当たる。

            # グループには最適化したいパラメータや、動作に必要な設定が辞書形式で入っている
            for param in group["params"]:

パラメータによっては勾配が計算されていないことが想定される。 その場合には値を更新する必要がないというかできないので処理をスキップする。

                if param.grad is None:
                    # パラメータの勾配が計算されていないものは更新しない
                    continue

そして、肝心の SGD の更新式を実装している部分に入る。 まず、SGD の更新式は以下のとおり。

 \displaystyle
\theta_{t+1} = \theta_t - \eta \nabla_{\theta} L(\theta_t)

上記の数式と、プログラムの変数の対応を以下に示す。 学習率はスーパークラスの __init__()defaults を通して渡したことでグループに登録されている。

  •  \theta
    • param.data
  •  \eta
    • group["lr"]
  •  \nabla_{\theta} L(\theta)
    • param.grad

上記より、パラメータの更新は次のようなコードになる。

                # group に格納された学習率 (lr) と勾配を使ってパラメータの値を更新する
                param.data -= group["lr"] * param.grad

サンプルコードを実行する

一通り説明できたので、サンプルコードに適当な名前をつけて実行する。

$ python customsgd.py

すると、以下のようなグラフが得られる。

自作した SGD で最適化したパラメータの軌跡

上記から、先ほど実行した PyTorch 組み込みの SGD と全く同じ軌跡を辿っていることが確認できる。

複数のモデルを登録してみる

先ほどの例ではグループが一つしかない場合だった。 続いては、一つのオプティマイザに複数のモデルを登録する場合も試してみよう。

サンプルコードが以下になる。 このコードでは model1model2 という 2 つの最適化すべきモデルを一つのオプティマイザに登録している。

from collections.abc import Iterable
from typing import Any

import torch
from matplotlib import pyplot as plt
from torch import nn
from torch.optim import Optimizer


class ExampleFunction(nn.Module):

    def __init__(self, a, x, b, y):
        super(ExampleFunction, self).__init__()
        self.a = a
        self.x = nn.Parameter(torch.tensor([x]))
        self.b = b
        self.y = nn.Parameter(torch.tensor([y]))

    def forward(self):
        return self.a * self.x**2 + self.b * self.y**2


class CustomSGD(Optimizer):

    def __init__(self, params: Iterable, lr: float = 1e-3):
        defaults: dict[str, Any] = dict(
            lr=lr,
        )
        super(CustomSGD, self).__init__(params, defaults)

    def step(self, closure: None = None) -> None:
        for group in self.param_groups:
            for param in group["params"]:
                if param.grad is None:
                    continue
                param.data -= group["lr"] * param.grad


def main():
    # 最適化したい複数のモデル
    model1 = ExampleFunction(a=1 / 20, x=-7.0, b=1.0, y=2.0)
    model2 = ExampleFunction(a=4, x=2.0, b=3, y=2.0)

    # 複数のモデルを一つのオプティマイザで最適化したい場合は、それぞれを辞書として渡す
    # 結果は torch.optim.Optimizer#param_groups の各要素として入る
    optimizer = CustomSGD(
        [
            {
                "params": model1.parameters(),
                "lr": 0.95,
            },
            {
                "params": model2.parameters(),
                "lr": 0.05,
            },
        ]
    )

    # それぞれの軌跡を残す
    trajectory_x1 = [model1.x.detach().numpy()[0]]
    trajectory_y1 = [model1.y.detach().numpy()[0]]
    trajectory_x2 = [model2.x.detach().numpy()[0]]
    trajectory_y2 = [model2.y.detach().numpy()[0]]

    num_epochs = 30
    for epoch in range(1, num_epochs + 1):
        optimizer.zero_grad()

        outputs1 = model1()
        outputs1.backward()

        outputs2 = model2()
        outputs2.backward()

        optimizer.step()

        x1 = model1.x.detach().numpy()[0]
        trajectory_x1.append(x1)
        y1 = model1.y.detach().numpy()[0]
        trajectory_y1.append(y1)

        x2 = model2.x.detach().numpy()[0]
        trajectory_x2.append(x2)
        y2 = model2.y.detach().numpy()[0]
        trajectory_y2.append(y2)

    # 軌跡を可視化する
    fig, ax = plt.subplots(1, 1, figsize=(8, 6))
    ax.plot(trajectory_x1, trajectory_y1, marker="o", markersize=5, label="Trajectory1")
    ax.plot(trajectory_x2, trajectory_y2, marker="o", markersize=5, label="Trajectory2")
    ax.legend()
    ax.grid(True)
    ax.set_xlabel("x")
    ax.set_ylabel("y")
    plt.show()


if __name__ == "__main__":
    main()

オプティマイザに登録している部分は以下のとおり。 リストの中に辞書形式で、複数のパラメータと学習率を登録している。

    # 複数のパラメータを最適化したい場合は、それぞれを辞書として渡す
    # 結果は torch.optim.Optimizer#param_groups の各要素として入る
    optimizer = CustomSGD(
        [
            {
                "params": model1.parameters(),
                "lr": 0.95,
            },
            {
                "params": model2.parameters(),
                "lr": 0.05,
            },
        ]
    )

上記のサンプルコードに名前をつけて実行してみる。

$ python groupsgd.py

すると、次のようなグラフが得られる。 異なる色の線が、それぞれのモデルのパラメータが更新されていく軌跡を表している。

自作した SGD で複数のモデルを最適化したパラメータの軌跡

上記から、それぞれのパラメータが 0 に向かって更新されていく様子が確認できる。

Momentum を導入した SGD を実装する

続いては SGD に Momentum の概念を導入する。 Momentum ではパラメータの更新にそれまでの勢いが加味されることから局所最適解に陥りにくくなる効果が見込める。

ここでは Momentum の実装を通して、オプティマイザで状態を表す変数の使い方を紹介したい。 というのも、先ほど実装した単純な SGD の更新式にはモデルのパラメータ以外に変数がなく、学習率も定数に過ぎなかった。 一方で Momentum では慣性を扱うことから、それまでのパラメータの更新のされ方を記録しておく必要がある。

早速だけど以下にサンプルコードを示す。 CustomMomentumSGD というクラスで Momentum を導入した SGD を実装している。 問題設定などは先ほどと変わらない。 ポイントは CustomMomentumSGDstep() メソッドの中で self.state というインスタンス変数を扱っているところ。 これを使うことで、オプティマイザに状態を持たせることができる。

from collections.abc import Iterable
from typing import Any

import torch
from matplotlib import pyplot as plt
from torch import nn
from torch.optim import Optimizer


class ExampleFunction(nn.Module):

    def __init__(self, a, x, b, y):
        super(ExampleFunction, self).__init__()
        self.a = a
        self.x = nn.Parameter(torch.tensor([x]))
        self.b = b
        self.y = nn.Parameter(torch.tensor([y]))

    def forward(self):
        return self.a * self.x**2 + self.b * self.y**2


class CustomMomentumSGD(Optimizer):
    """自作した Momentum SGD のオプティマイザ"""

    def __init__(self, params: Iterable, lr: float = 1e-3, momentum: float = 0.9):
        defaults: dict[str, Any] = dict(
            lr=lr,
            momentum=momentum,
        )
        super(CustomMomentumSGD, self).__init__(params, defaults)

    def step(self, closure: None = None) -> None:
        """Momentum を導入した SGD の更新式を実装した step() メソッド

        (更新式)
        v_0 = 0
        v_{t+1} = gamma * v_t + grad(L(theta_t))
        theta_{t+1} = theta_t - eta * v_{t+1}

        theta: パラメータ (重み)
        gamma: モーメンタム係数
        v: モーメント
        eta: 学習率
        grad(L(theta)): 損失関数の勾配
        """
        for group in self.param_groups:
            for param in group["params"]:
                if param.grad is None:
                    continue
                # v_0 = 0 に対応している
                if "v" not in self.state[param]:
                    # モーメントが存在しない初期状態であればゼロで初期化する
                    self.state[param]["v"] = torch.zeros_like(param.data)
                # v_{t+1} = gamma * v_t + grad(L(theta_t)) に対応している
                self.state[param]["v"] = (
                    group["momentum"] * self.state[param]["v"] + param.grad
                )
                # theta_{t+1} = theta_t - eta * v_{t+1} に対応している
                param.data -= group["lr"] * self.state[param]["v"]


def main():
    model = ExampleFunction(a=1 / 20, x=-7.0, b=1.0, y=2.0)

    # 自作した Momentum SGD で最適化する
    optimizer = CustomMomentumSGD(model.parameters(), lr=0.1, momentum=0.9)

    trajectory_x = [model.x.detach().numpy()[0]]
    trajectory_y = [model.y.detach().numpy()[0]]

    num_epochs = 30
    for epoch in range(1, num_epochs + 1):
        optimizer.zero_grad()

        outputs = model()

        outputs.backward()

        optimizer.step()

        x = model.x.detach().numpy()[0]
        trajectory_x.append(x)
        y = model.y.detach().numpy()[0]
        trajectory_y.append(y)

    fig, ax = plt.subplots(1, 1, figsize=(8, 6))
    ax.plot(trajectory_x, trajectory_y, marker="o", markersize=5, label="Trajectory")
    ax.legend()
    ax.grid(True)
    ax.set_xlabel("x")
    ax.set_ylabel("y")
    plt.show()


if __name__ == "__main__":
    main()

以降は単純な SGD との違いについて __init__() メソッドと step() のコードを見ていく。

__init__() メソッドについて

まず、__init__() では momentum という名前で float 型の引数が増えている。 これは、パラメータが更新される際の慣性の強さを指定するハイパーパラメータになっている。 この引数の値が大きいほど、それまでの勢いが強く反映された状態でパラメータが更新される。 学習率 (lr) と同じように、スーパークラスの __init__() に渡すことで group["momentum"] という形式でアクセスできるようになる。

step() メソッドについて

続いては step() メソッドについて。 このメソッドは Momentum を導入した SGD の更新式と共に見ていこう。 更新式は次のとおり。

 \displaystyle
v_0 = 0 \\
v_{t+1} = \gamma v_t + \nabla_{\theta} L(\theta_t) \\
\theta_{t+1} = \theta_t - \eta v_{t+1}

上記の数式と、プログラムの変数の対応を以下に示す。 SGD に比べると  \gamma v が増えている。 前述した通り self.state を使って「勢い」を状態として保持する。

  •  \theta
    • param.data
  •  \eta
    • group["lr"]
  •  \nabla_{\theta} L(\theta)
    • param.grad
  •  \gamma
    • group["momentum"]
  •  v
    • self.state[param]["v"]

はじめに以下の更新式に対応するコードから。

 \displaystyle
v_0 = 0

ここでは、要するに最初は状態が何もないので必要な変数をゼロで初期化している。

                if "v" not in self.state[param]:
                    # モーメントが存在しない初期状態であればゼロで初期化する
                    self.state[param]["v"] = torch.zeros_like(param.data)

次に、以下の更新式に対応するコード。

 \displaystyle
v_{t+1} = \gamma v_t + \nabla_{\theta} L(\theta_t)

ここでは過去の更新の勢いを加味しながら、新しい勾配を使って次の更新のされ方を決めている。 こういった、過去の値に係数をかけつつ新しい値を足していくやり方は指数移動平均と呼ばれる。 主要なオプティマイザのアルゴリズムでは、この指数移動平均の処理が頻出する。

                self.state[param]["v"] = group["momentum"] * self.state[param]["v"] + param.grad

最後に、以下の更新式に対応するコード。

 \displaystyle
\theta_{t+1} = \theta_t - \eta v_{t+1}

ここでは、先ほどのモーメントに学習率をかけたもので実際のパラメータを更新している。

                param.data -= group["lr"] * self.state[param]["v"]

サンプルコードを実行する

一通り説明できたので、サンプルコードに適当な名前をつけて実行する。

$ python custommomentum.py

すると、以下のようなグラフが得られる。

自作した Momentum SGD で最適化したパラメータの軌跡

単純な SGD とはアルゴリズムやハイパーパラメータが異なることから、パラメータの軌跡も異なることが確認できる。

PyTorch 組み込みの結果と比べる

念の為、PyTorch に組み込みで用意されている Momentum SGD と結果が揃うことを確認する。

サンプルコードは次のとおり。 オプティマイザを組み込みのものに差し替えた以外の違いはない。

from collections.abc import Iterable
from typing import Any

import torch
from matplotlib import pyplot as plt
from torch import nn
from torch import optim
from torch.optim import Optimizer


class ExampleFunction(nn.Module):

    def __init__(self, a, x, b, y):
        super(ExampleFunction, self).__init__()
        self.a = a
        self.x = nn.Parameter(torch.tensor([x]))
        self.b = b
        self.y = nn.Parameter(torch.tensor([y]))

    def forward(self):
        return self.a * self.x**2 + self.b * self.y**2


def main():
    model = ExampleFunction(a=1 / 20, x=-7.0, b=1.0, y=2.0)

    # PyTorch 組み込みの Momentum SGD で最適化する
    # SGD の引数に momentum を指定するだけ
    optimizer = optim.SGD(model.parameters(), lr=0.1, momentum=0.9)

    trajectory_x = [model.x.detach().numpy()[0]]
    trajectory_y = [model.y.detach().numpy()[0]]

    num_epochs = 30
    for epoch in range(1, num_epochs + 1):
        optimizer.zero_grad()

        outputs = model()

        outputs.backward()

        optimizer.step()

        x = model.x.detach().numpy()[0]
        trajectory_x.append(x)
        y = model.y.detach().numpy()[0]
        trajectory_y.append(y)

    fig, ax = plt.subplots(1, 1, figsize=(8, 6))
    ax.plot(trajectory_x, trajectory_y, marker="o", markersize=5, label="Trajectory")
    ax.legend()
    ax.grid(True)
    ax.set_xlabel("x")
    ax.set_ylabel("y")
    plt.show()


if __name__ == "__main__":
    main()

上記に適当な名前をつけて実行する。

$ python torchmomentum.py

PyTorch 組み込みの Momentum SGD で最適化したパラメータの軌跡

自作した Momentum SGD と軌跡が一致していることが確認できる。

まとめ

今回は PyTorch でオプティマイザを自作する方法について紹介した。

参考

SGD と Momentum SGD の更新式は、以下の論文に記載されている内容を参考にした。

arxiv.org


  1. https://www.oreilly.co.jp/books/9784873117584/
  2. ただし closure が登録されているときは損失を受け取って、それをただメソッドの返り値にしているだけ

Python: isort で同じパッケージのインポートを 1 行ずつに分割する

isort は、Python のコードフォーマッタのひとつ。 使うことで、インポート文を Python の標準的なコーディング規約である PEP8 に沿った形で整形できる。

pycqa.github.io

ところで、PEP8 では同じパッケージのオブジェクトをインポートする際には 1 行にまとめても構わないとしている。

peps.python.org

ただ、これは完全に好みの問題だけど自分は同じパッケージのインポートであってもオブジェクトごとに 1 行ずつ分割したい。 1 行に 1 つのオブジェクトしか書かなければ、その行で何をインポートしているかが明確になって読みやすいと感じるため。

そこで、今回は isort で 1 行に 1 つのオブジェクトをインポートする方法について書く。 結論から先に述べると、コマンドラインオプションであれば --sl または --force-single-line-imports を付ければ良い。 また、設定ファイルを使って指定することもできる。

使った環境は次のとおり。

$ sw_vers                    
ProductName:        macOS
ProductVersion:     14.6.1
BuildVersion:       23G93
$ python -V        
Python 3.11.9

もくじ

下準備

下準備として isort をインストールする。

$ pip install isort

デフォルトの動作について

サンプルコードを用意する。 urllib.parse パッケージから parse_qs()urlparse() 関数の 2 つをインポートしている。

$ cat << 'EOF' > example.py
from urllib.parse import parse_qs
from urllib.parse import urlparse
EOF

上記のファイルに isort を実行する。

$ isort example.py

すると、同じパッケージからのインポートなので 1 行にまとめられる。

$ cat example.py           
from urllib.parse import parse_qs, urlparse

コマンドラインオプションで指定する

続いては、今回の本題である同じパッケージからのインポートであってもオブジェクトごとに 1 行ずつ分割する。 これには、コマンドラインオプションであれば --sl または --force-single-line-imports を指定すれば良い。

$ isort --sl example.py

実行すると、以下のように 1 行ずつに分割した形でインポートされる。

$ cat example.py 
from urllib.parse import parse_qs
from urllib.parse import urlparse

設定ファイルで指定する

とはいえ、毎回コマンドラインオプションを指定するのも大変なので設定ファイルにしたい。

pyproject.toml ファイルを使う場合

まず、最近の Python のプロジェクトであれば pyproject.toml を用意するのが一般的なはず。 ここで指定する場合には、次のような設定を追加する。

$ cat << 'EOF' >> pyproject.toml
[tool.isort]
force_single_line = true
EOF

あとは、設定ファイルのある場所で isort コマンドを実行するだけ。

.isort.cfg ファイルを使う場合

pyproject.toml を使わない場合には、専用の設定ファイルとして .isort.cfg を用意すれば良い。 こちらを使う場合には、次のような設定を使う。

$ cat << 'EOF' > .isort.cfg
[settings]
force_single_line=True
EOF

いじょう。

Python: Polars の shrink_dtype で DataFrame の使用メモリを削減する

Kaggle などのデータ分析コンペで使われるテクニックのひとつに reduce_mem_usage() 関数がある。 これは、一般に pandas の DataFrame のメモリ使用量を削減するために用いられる。 具体的には、カラムに出現する値を調べて、それを表現する上で必要最低限な型にキャストする。 たとえば、64 ビット整数のカラムを 32 ビット整数にできれば、理屈の上では必要なメモリ使用量がおよそ半分になる。

ただし、この関数は pandas が組み込みで提供しているわけではない。 そのため、各々がスニペットを秘伝のタレのように持っているか、あるいは必要に応じてウェブ上から探して利用する場合が多いはず。

一方で、Polars にはこれに相当する機能が組み込みで用意されている。 具体的には polars.Expr#shrink_dtype() という Expr オブジェクトを返すメソッドがある。 あまり知られていないようなので、今回は紹介してみる。

docs.pola.rs

使った環境は次のとおり。

$ sw_vers
ProductName:        macOS
ProductVersion:     14.6
BuildVersion:       23G80
$ python -V                      
Python 3.11.9
$ pip list | grep polars
polars        1.4.0

もくじ

下準備

まずは Polars をインストールしておく。

$ pip install polars

適当なデータセットとして Diamonds データセットをダウンロードする。

$ wget https://raw.githubusercontent.com/mwaskom/seaborn-data/master/diamonds.csv

Python のインタプリタを起動する。

$ python

CSV ファイルを読み込んで DataFrame オブジェクトを作る。 そのままだとサイズが分かりにくいので 200 個ほど連結してサイズをかさ増しする。

>>> import polars as pl
>>> raw_df = pl.read_csv("diamonds.csv")
>>> df = pl.concat([raw_df for _ in range(200)])
>>> df
shape: (10_788_000, 10)
┌───────┬───────────┬───────┬─────────┬───┬───────┬──────┬──────┬──────┐
│ carat ┆ cut       ┆ color ┆ clarity ┆ … ┆ price ┆ x    ┆ y    ┆ z    │
│ ------------     ┆   ┆ ------------  │
│ f64   ┆ str       ┆ str   ┆ str     ┆   ┆ i64   ┆ f64  ┆ f64  ┆ f64  │
╞═══════╪═══════════╪═══════╪═════════╪═══╪═══════╪══════╪══════╪══════╡
│ 0.23  ┆ Ideal     ┆ E     ┆ SI2     ┆ … ┆ 3263.953.982.43 │
│ 0.21  ┆ Premium   ┆ E     ┆ SI1     ┆ … ┆ 3263.893.842.31 │
│ 0.23  ┆ Good      ┆ E     ┆ VS1     ┆ … ┆ 3274.054.072.31 │
│ 0.29  ┆ Premium   ┆ I     ┆ VS2     ┆ … ┆ 3344.24.232.63 │
│ 0.31  ┆ Good      ┆ J     ┆ SI2     ┆ … ┆ 3354.344.352.75 │
│ …     ┆ …         ┆ …     ┆ …       ┆ … ┆ …     ┆ …    ┆ …    ┆ …    │
│ 0.72  ┆ Ideal     ┆ D     ┆ SI1     ┆ … ┆ 27575.755.763.5  │
│ 0.72  ┆ Good      ┆ D     ┆ SI1     ┆ … ┆ 27575.695.753.61 │
│ 0.7   ┆ Very Good ┆ D     ┆ SI1     ┆ … ┆ 27575.665.683.56 │
│ 0.86  ┆ Premium   ┆ H     ┆ SI2     ┆ … ┆ 27576.156.123.74 │
│ 0.75  ┆ Ideal     ┆ D     ┆ SI2     ┆ … ┆ 27575.835.873.64 │
└───────┴───────────┴───────┴─────────┴───┴───────┴──────┴──────┴──────┘

特に指定しない場合、デフォルトでは数値のカラムが 64 ビットの型になる。

>>> df.dtypes
[Float64, String, String, String, Float64, Float64, Int64, Float64, Float64, Float64]

この状況では DataFrame のサイズは約 683MB だった。

>>> df.estimated_size(unit="mb")
683.1520080566406

使用メモリを削減する

それでは、実際に polars.Expr.shrink_dtype() を使って DataFrame の使用メモリを減らしてみよう。 すべてのカラムを処理の対象とする場合には pl.all().shrink_dtype() とすれば良い。 あとは得られた Expr オブジェクトを DataFrame#select() に渡せば DataFrame が変換される。

>>> shrinked_df = df.select(pl.all().shrink_dtype())
>>> shrinked_df
shape: (10_788_000, 10)
┌───────┬───────────┬───────┬─────────┬───┬───────┬──────┬──────┬──────┐
│ carat ┆ cut       ┆ color ┆ clarity ┆ … ┆ price ┆ x    ┆ y    ┆ z    │
│ ------------     ┆   ┆ ------------  │
│ f32   ┆ str       ┆ str   ┆ str     ┆   ┆ i16   ┆ f32  ┆ f32  ┆ f32  │
╞═══════╪═══════════╪═══════╪═════════╪═══╪═══════╪══════╪══════╪══════╡
│ 0.23  ┆ Ideal     ┆ E     ┆ SI2     ┆ … ┆ 3263.953.982.43 │
│ 0.21  ┆ Premium   ┆ E     ┆ SI1     ┆ … ┆ 3263.893.842.31 │
│ 0.23  ┆ Good      ┆ E     ┆ VS1     ┆ … ┆ 3274.054.072.31 │
│ 0.29  ┆ Premium   ┆ I     ┆ VS2     ┆ … ┆ 3344.24.232.63 │
│ 0.31  ┆ Good      ┆ J     ┆ SI2     ┆ … ┆ 3354.344.352.75 │
│ …     ┆ …         ┆ …     ┆ …       ┆ … ┆ …     ┆ …    ┆ …    ┆ …    │
│ 0.72  ┆ Ideal     ┆ D     ┆ SI1     ┆ … ┆ 27575.755.763.5  │
│ 0.72  ┆ Good      ┆ D     ┆ SI1     ┆ … ┆ 27575.695.753.61 │
│ 0.7   ┆ Very Good ┆ D     ┆ SI1     ┆ … ┆ 27575.665.683.56 │
│ 0.86  ┆ Premium   ┆ H     ┆ SI2     ┆ … ┆ 27576.156.123.74 │
│ 0.75  ┆ Ideal     ┆ D     ┆ SI2     ┆ … ┆ 27575.835.873.64 │
└───────┴───────────┴───────┴─────────┴───┴───────┴──────┴──────┴──────┘

上記を見ても分かるとおり、行数や列数などは何も変わっていない。 ただし、カラムの型は必要最低限なものにキャストされている。

>>> shrinked_df.dtypes
[Float32, String, String, String, Float32, Float32, Int16, Float32, Float32, Float32]

処理後の DataFrame は、使用メモリが約 374MB まで減っている。

>>> shrinked_df.estimated_size(unit="mb")
374.5048522949219

いじょう。

まとめ

  • Polars には reduce_mem_usage() 関数に相当する機能が組み込みで用意されている
  • polars.Expr.shrink_dtype() という Expr オブジェクトを返す関数を使う

Python: PyTorch で AutoEncoder を書いてみる

PyTorch に慣れるためにコードをたくさん読み書きしていきたい。 今回は MNIST データセットを使ってシンプルな AutoEncoder を書いてみる。

使った環境は次のとおり。

$ sw_vers             
ProductName:        macOS
ProductVersion:     14.5
BuildVersion:       23F79
$ python -V
Python 3.11.9
$ sysctl machdep.cpu.brand_string
machdep.cpu.brand_string: Apple M2 Pro
$ pip list | egrep "(torch|matplotlib)"
matplotlib        3.9.0
torch             2.3.1
torchvision       0.18.1

もくじ

下準備

下準備として必要なパッケージをインストールする。

$ pip install torch torchvision matplotlib

サンプルコード

早速だけど以下がサンプルコードになる。 説明は適宜、コメントの形で挿入している。

#!/usr/bin/env python3

import random

import torch
import torch.nn as nn
import torch.optim as optim
from matplotlib import pyplot as plt
from torch.utils.data import DataLoader, random_split
from torchvision import datasets, transforms


def set_random_seed(seed):
    """シード値を設定する"""
    random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)


def computing_device(force=None):
    """環境によって計算に使うデバイスを切り替える関数"""
    if force is not None:
        return force
    if torch.cuda.is_available():
        return "cuda"
    if torch.backends.mps.is_available():
        return "mps"
    return "cpu"


class AutoEncoder(nn.Module):
    """ボトルネック部分で 32 次元まで圧縮する 3 層 AutoEncoder モデル"""

    def __init__(self):
        super(AutoEncoder, self).__init__()
        self.encoder = nn.Sequential(
            nn.Linear(
                in_features=28 * 28,
                out_features=128,
            ),
            nn.ReLU(inplace=True),
            nn.Linear(
                in_features=128,
                out_features=64,
            ),
            nn.ReLU(inplace=True),
            nn.Linear(
                in_features=64,
                out_features=32,
            ),
        )
        self.decoder = nn.Sequential(
            nn.Linear(
                in_features=32,
                out_features=64,
            ),
            nn.ReLU(inplace=True),
            nn.Linear(
                in_features=64,
                out_features=128,
            ),
            nn.ReLU(inplace=True),
            nn.Linear(
                in_features=128,
                out_features=28 * 28,
            ),
            nn.Sigmoid(),
        )

    def encode(self, x):
        """エンコードの処理をするメソッド"""
        return self.encoder(x)

    def forward(self, x):
        """順伝播"""
        x = self.encode(x)
        x = self.decoder(x)
        return x


def evaluate(model, dataloader, device, criterion):
    """評価に使う関数"""
    model.eval()
    running_loss = 0.0
    with torch.no_grad():
        for data in dataloader:
            # ラベルは使用しない
            inputs, _ = data
            inputs = inputs.to(device)

            outputs = model(
                inputs,
            )

            loss = criterion(outputs, inputs)
            running_loss += loss.item()

    average_loss = running_loss / len(dataloader)

    return average_loss


def train(
    model,
    train_dataloader,
    valid_dataloader,
    device,
    criterion,
    optimizer,
    num_epochs,
    early_stopping_patience,
    checkpoint_path="checkpoint.pt",
):
    """学習に使う関数"""
    print(f"Device: {device}")

    # Early Stopping に使うカウンタ
    early_stopping_patience_counter = 0
    # Early Stopping に使う検証データに対する損失
    early_stopping_best_val_loss = float("inf")

    for epoch in range(1, num_epochs + 1):
        model.train()
        running_loss = 0.0
        for batch_idx, data in enumerate(train_dataloader):
            # ラベルは使用しない
            inputs, _ = data

            inputs = inputs.to(device)

            optimizer.zero_grad()
            outputs = model(
                inputs,
            )

            loss = criterion(outputs, inputs)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()

        print(f"Epoch [{epoch}/{num_epochs}], Training Loss: {running_loss / len(train_dataloader):.5f}")

        val_loss = evaluate(model, valid_dataloader, device, criterion)
        print(f"Epoch [{epoch}/{num_epochs}], Validation Loss: {val_loss:.5f}")

        if early_stopping_patience == -1:
            continue

        if val_loss < early_stopping_best_val_loss:
            early_stopping_best_val_loss = val_loss
            early_stopping_patience_counter = 0
            # ベストなモデルとして Checkpoint を更新する
            checkpoint_params = {
                "epoch": epoch,
                "model_state_dict": model.state_dict(),
                "optimizer_state_dict": optimizer.state_dict(),
                "loss": val_loss,
            }
            torch.save(
                checkpoint_params,
                checkpoint_path,
            )
        else:
            early_stopping_patience_counter += 1

        if early_stopping_patience_counter >= early_stopping_patience:
            print(f"Early stopping at epoch {epoch + 1}")
            break

    print("Training Finished")


def main():
    # 事前にシード値を固定する
    set_random_seed(42)

    # MNIST データセットを読み込む
    transform = transforms.Compose(
        [
            # PyTorch Tensor への変換と Min-Max Normalization
            transforms.ToTensor(),
            # (28, 28) -> (784,)
            transforms.Lambda(lambda x: torch.flatten(x)),
        ]
    )
    mnist_train_dataset = datasets.MNIST(
        root="dataset",
        train=True,
        download=True,
        transform=transform,
    )
    mnist_test_dataset = datasets.MNIST(
        root="dataset",
        train=False,
        download=True,
        transform=transform,
    )

    # 学習用のデータセットを学習用と検証用に分割する
    dataset_size = len(mnist_train_dataset)
    val_size = int(dataset_size * 0.2)
    train_size = dataset_size - val_size
    train_dataset, valid_dataset = random_split(
        mnist_train_dataset, (train_size, val_size)
    )

    # データローダを設定する
    batch_size = 64
    train_dataloader = DataLoader(
        mnist_train_dataset,
        batch_size=batch_size,
        shuffle=True,
    )
    valid_dataloader = DataLoader(
        valid_dataset,
        batch_size=batch_size,
        shuffle=False,
    )
    test_dataloader = DataLoader(
        mnist_test_dataset,
        batch_size=batch_size,
        shuffle=False,
    )

    # 最大エポック数
    num_epochs = 1_000
    # 改善が見られなかった場合に停止する Early Stopping のエポック数
    early_stopping_patience = 5

    # 学習に使うデバイス
    device = computing_device()

    # モデル
    model = AutoEncoder()
    model = model.to(device)

    # 損失関数
    criterion = nn.MSELoss()
    # オプティマイザ
    optimizer = optim.Adam(model.parameters())

    # 途中結果を記録するパス
    checkpoint_path = "MNIST-AE.pt"

    # 学習する
    train(
        model,
        train_dataloader,
        valid_dataloader,
        device,
        criterion,
        optimizer,
        num_epochs,
        early_stopping_patience,
        checkpoint_path,
    )

    # ベストなモデルをロードする
    checkpoint = torch.load(checkpoint_path)
    model.load_state_dict(checkpoint["model_state_dict"])
    best_epoch = checkpoint["epoch"]
    best_val_loss = checkpoint["loss"]

    # テストデータを評価する
    test_loss = evaluate(
        model,
        test_dataloader,
        device,
        criterion,
    )
    print(f"Epoch: {best_epoch}, Validation Loss: {best_val_loss:.5f}")
    print(f"Test Set Evaluation - Loss: {test_loss:.5f}")

    # テストデータに対する結果を可視化する
    model.eval()

    # 最初のミニバッチを取り出す
    mini_batch = next(iter(test_dataloader))

    # ミニバッチのデータをモデルに通す
    inputs, labels = mini_batch
    inputs = inputs.to(device)
    with torch.no_grad():
        outputs = model(
            inputs,
        ).to("cpu")
        encoded = model.encode(
            inputs,
        ).to("cpu")

    # ランダムに 10 点をサンプリングして可視化する
    sample_indices = random.sample(range(mini_batch[0].shape[0]), 10)
    fig, axes = plt.subplots(3, 10)
    for i, idx in enumerate(sample_indices):
        # 元の画像 (28 x 28)
        orig_img = mini_batch[0][idx].reshape(28, 28)
        axes[0][i].imshow(orig_img, cmap="gray")
        axes[0][i].axis("off")
        axes[0][i].set_title(labels[idx].numpy(), color="red")
        # ボトルネック部分での表現 (8 x 4)
        enc_img = encoded[idx].reshape(8, 4)
        axes[1][i].imshow(enc_img, cmap="gray")
        axes[1][i].axis("off")
        # 復元した画像 (28 x 28)
        pred_img = outputs[idx].reshape(28, 28)
        axes[2][i].imshow(pred_img, cmap="gray")
        axes[2][i].axis("off")

    plt.savefig("mnistae.png")
    plt.show()


if __name__ == "__main__":
    main()

上記を実行する。 エポックを重ねる毎に少しずつ損失が減っていく。

$ python mnistae.py
Device: mps
Epoch [1/1000], Training Loss: 0.04848
Epoch [1/1000], Validation Loss: 0.02846
Epoch [2/1000], Training Loss: 0.02480
Epoch [2/1000], Validation Loss: 0.02163
Epoch [3/1000], Training Loss: 0.01990
Epoch [3/1000], Validation Loss: 0.01821
...
Epoch [78/1000], Training Loss: 0.00544
Epoch [78/1000], Validation Loss: 0.00538
Epoch [79/1000], Training Loss: 0.00542
Epoch [79/1000], Validation Loss: 0.00542
Epoch [80/1000], Training Loss: 0.00541
Epoch [80/1000], Validation Loss: 0.00536
Early stopping at epoch 81
Training Finished
Epoch: 75, Validation Loss: 0.00532
Test Set Evaluation - Loss: 0.00542

実行が完了すると次のような可視化が得られる。

実行結果

それぞれ、以下のような意味がある。

  • 上段はモデルの入力となった画像を表している
  • 中段はモデルのボトルネック部分において圧縮された表現を可視化したもの
  • 下段はモデルが出力した画像を表している