読者です 読者をやめる 読者になる 読者になる

CUBE SUGAR CONTAINER

技術系のこと書きます。

Python: 多変数の関数から勾配法で最小値を探す

Mac OS X NumPy Python matplotlib 機械学習

以前、このブログで一変数の関数から勾配法で最小値を探す記事を書いた。

blog.amedama.jp

このときは題材として  f(x) = x^{2} という一変数の関数を扱った。 今回は、これを多変数の関数に拡張してみることにする。 ちなみに、この多変数というのは機械学習における多次元と同じ意味を持っている。 つまり、これができると多次元のデータセットで損失関数の出力から最小値を探すことができることを意味している。

使った環境は次の通り。

$ sw_vers 
ProductName:    Mac OS X
ProductVersion: 10.11.6
BuildVersion:   15G1108
$ python --version
Python 3.5.2

下準備

まずは、前回と同じように可視化に使う matplotlib と、数値計算用のライブラリである numpy をインストールしておこう。

$ pip install matplotlib numpy

多変数の関数を解析的に偏微分する

今回題材として扱う関数は  f(x_0, x_1) = x_0^{2} + x_1^{2} にする。

勾配法では最初に関数を微分する必要があった。 これは、関数が多変数になったときも変わらない。 ただし、多変数になると名前だけは偏微分と呼ばれるようになる。 また、変数が複数あるので、それぞれの関数について微分することになる。

試しに、題材とする関数を  x_0 について解析的に偏微分してみよう。 このとき、微分する対象ではない変数  x_1 は固定値と捉える。 そのため、結果は  2x_0 となる。

 \frac{ \partial f }{ \partial x_0 } = 2x_0

次のサンプルコードでは  x_0 = 3.0, x_1 = 4.0 のときの  \frac{ \partial f }{ \partial x_0 } を計算している。 とはいえ、上記の内容から結果は  2 \times 3 = 6 とすぐに分かる。

#!/usr/bin/env python
# -*- coding: utf-8 -*-


def f(x0, x1):
    """偏微分したい関数: f(x) = x_0^2 + x_1^2"""
    return x0 ** 2 + x1 ** 2


def df_x0(x0, x1):
    """関数 f の解析的な x0 についての偏微分: f'(x) = 2x_0"""
    return 2 * x0  # x_1 は固定値と捉えて微分することで消滅しており登場しない


def main():
    grad = df_x0(3.0, 4.0)  # ぶっちゃけ x_1 は影響を与えないので何を使っても良い
    print('解析的な x0 についての偏微分: {0}'.format(grad))


if __name__ == '__main__':
    main()

実行結果は次の通り。 まあ、当たり前だね。

解析的な x0 についての偏微分: 6.0

多変数の関数から数値解法で偏微分の近似値を得る

先ほどの例では人が数式を見て解析的に偏微分したけど、これだと機械学習アルゴリズムには適用が難しい。 近似値で良いから、先ほどの偏微分をコンピュータにやらせたい。 そこで、今度は前回のブログでも扱った中央差分を元にした数値微分を適用してみることにしよう。

次のサンプルコードを見てほしい。 ここでは、先ほどと同じように  x_0 = 3.0, x_1 = 4.0 のときの  \frac{ \partial f }{ \partial x_0 } を計算している。 ポイントとしては、偏微分というのは微分に使わない変数を固定値として扱うところだ。 なので  x_14.0 に固定した関数 f_x0 を定義している。 そして、その関数を前回のブログ記事にも登場した数値微分の関数 numerical_diff で微分している。

#!/usr/bin/env python
# -*- coding: utf-8 -*-


def f(x0, x1):
    """偏微分したい関数: f(x) = x0^2 + x1^2"""
    return x0 ** 2 + x1 ** 2


def f_x0(x0):
    """関数 f を x0 について偏微分するために x1 を固定値にした関数"""
    return f(x0, 4.0)


def numerical_diff(f, x):
    """中央差分を元に数値微分する関数"""
    h = 1e-4
    return (f(x + h) - f(x - h)) / (2 * h)


def main():
    grad = numerical_diff(f_x0, 3.0)
    print('数値微分を使った x0 についての偏微分: {0}'.format(grad))


if __name__ == '__main__':
    main()

上記を実行してみよう。

数値微分を使った x0 についての偏微分: 6.00000000000378

おお!解析的な偏微分で得られた値とほとんど同じ値 (近似値) が得られている!

NumPy を使って汎用的にしてみる

先ほどのサンプルコードでは偏微分をするために、微分に関係しない変数を固定値にした関数を新たに用意していた。 しかし、これを毎回やるのは大変なので、今度は数値微分の関数 numerical_diffを偏微分用に拡張してみよう。

次のサンプルコードでは、偏微分する関数 f の引数を変更している。 具体的には、これまで別々の引数としていた  x_0 x_1 を NumPy の配列で渡すことにしている。 また、偏微分に拡張した関数 numerical_diff には引数 i を追加している。 これは、引数 x のうち何番目の要素で偏微分するかを表すインデックスになっている。 例えば  i = 0 なら  x_0 について偏微分することを意味している。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import numpy as np


def f(x):
    """偏微分したい関数: f(x) = x0^2 + x1^2"""
    return x[0] ** 2 + x[1] ** 2


def numerical_diff(f, x, i):
    """中央差分を元に数値微分する関数 (偏微分)

    :param function f: 偏微分する関数
    :param numpy.ndarray x: 偏微分する引数
    :param int i: 偏微分する変数のインデックス
    """
    # 丸め誤差で無視されない程度に小さな値を用意する
    h = 1e-4
    # 偏微分する変数のインデックスにだけ上記の値を入れる
    h_vec = np.zeros_like(x)
    h_vec[i] = h
    # 数値微分を使って偏微分する
    return (f(x + h_vec) - f(x - h_vec)) / (2 * h)


def main():
    args = [3.0, 4.0]  # 偏微分に使う引数
    grad = numerical_diff(f, args, 0)  # 0 番目の要素 (x0) について偏微分する
    print('数値微分を使った x0 についての偏微分: {0}'.format(grad))


if __name__ == '__main__':
    main()

ポイントとしては偏微分する要素にだけ丸め誤差で無視されない程度に小さな値を加えて数値微分している点だ。

上記を実行してみよう。

数値微分を使った x0 についての偏微分: 6.00000000000378

先ほどと同じように上手く偏微分できていることが分かる。

偏微分を元に勾配を計算する

多変数の関数において、各変数について偏微分して得られた値をベクトルにまとめたものを勾配と呼ぶ。 次は、この勾配を計算してみよう。 とはいえ、ある変数について偏微分するやり方は先ほどの例で分かったので、あとはそれを繰り返すだけ。

次のサンプルコードでは numerical_gradient という関数で購買を計算している。 ポイントは特に無くて、各変数について淡々と偏微分するだけ。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import numpy as np


def f(x):
    """偏微分したい関数: f(x) = x0^2 + x1^2"""
    return x[0] ** 2 + x[1] ** 2


def numerical_diff(f, x, i):
    """中央差分を元に数値微分する関数 (偏微分)

    :param function f: 偏微分する関数
    :param numpy.ndarray x: 偏微分する引数
    :param int i: 偏微分する変数のインデックス
    """
    # 丸め誤差で無視されない程度に小さな値を用意する
    h = 1e-4
    # 偏微分する変数のインデックスにだけ上記の値を入れる
    h_vec = np.zeros_like(x)
    h_vec[i] = h
    # 数値微分を使って偏微分する
    return (f(x + h_vec) - f(x - h_vec)) / (2 * h)


def numerical_gradient(f, x):
    """勾配を計算する関数

    勾配というのは、全ての変数について偏微分した結果をベクトルとしてまとめたものを言う。
    """
    # 勾配を入れるベクトルをゼロで初期化する
    grad = np.zeros_like(x)

    for i, _ in enumerate(x):
        # i 番目の変数で偏微分する
        grad[i] = numerical_diff(f, x, i)

    # 計算した勾配を返す
    return grad


def main():
    grad = numerical_gradient(f, np.array([3.0, 4.0]))
    print('勾配: {0}'.format(grad))


if __name__ == '__main__':
    main()

実行すると  x_0 = 3.0, x_1 = 4.0 のときの勾配が得られる。

勾配: [ 6.  8.]

題材となる関数を可視化してみる

さて、先ほどのサンプルコードで勾配を計算することができた。 とはいえ、何のために勾配を計算したのかよく分からないかもしれないので、ここで補足しておく。 先ほど計算した勾配には、それぞれの変数について偏微分したときの傾きが入っている。 これはつまり、各次元ごとのどちらに進めば値が小さくなるかという情報だ。

上記の説明だけでは分かりづらいかもしれないので、より原点に立ち返った説明もしてみよう。 例えば  x_0, x_1, y という三つの軸を持った三次元の空間をイメージしてほしい。 通常であれば変数の名前は  x, y, z になるけど、ここではあえて今回使った変数の名前にしている。 まず  x_0 x_1 が関数の入力で  y が出力だとしよう。 縦と横が  x_0 x_1 に対応していて高さが  y ということになる。 今回の問題は高さ  y が最も低くなる位置を  x_0 x_1 の平面の中から探したいということだ。

試しに、今回題材とする関数  f(x) = x_0^{2} + x_1^{2} を上記のように三次元で捉えて可視化してみることにしよう。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from mpl_toolkits.mplot3d import Axes3D
from matplotlib import pyplot as plt
import numpy as np


def main():
    x0 = np.arange(-2, 2, 0.1)
    x1 = np.arange(-2, 2, 0.1)
    X0, X1 = np.meshgrid(x0, x1)
    Y = X0 ** 2 + X1 ** 2

    fig = plt.figure()
    ax = Axes3D(fig)
    ax.plot_wireframe(X0, X1, Y)

    plt.show()


if __name__ == '__main__':
    main()

上記を実行すると、次のようなグラフが得られる。 f:id:momijiame:20161217194456p:plain 上記より、最小値は  x0 = 0, x1 = 0 ということが分かる。

偏微分を元にした勾配の計算というのは、上記のグラフでいえば各点における傾きを  x_0, x_1 という、それぞれの軸ごとに計算するという意味になる。

勾配法への組み込み

ここで、前回も扱った勾配法の数式を多変数に拡張して示す。

 x_{ij + 1} = x_{ij} - \eta f_{x_i}'(x)

上記で現在の位置を  x_{ij} としたとき、より最小値に近づいたものが  x_{ij + 1} になる。  \eta は学習率と呼ばれるもので、どれくらいの勢いで最小値に近づいていくかを表している。 そして  f_{x_i}'(x) f x_i について偏微分した関数になる。

上記は、扱う変数が増えて多変数 (多次元) になったとしてもアルゴリズムの基本は全く変わらない。 単に、各次元ごとにそれぞれで勾配法を実行するだけだ。 次のサンプルコードでは初期位置を  x_0 = 1, x_0 = 2 として、学習率 0.01 ステップ数 50 回で勾配法を実行している。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from matplotlib import pyplot as plt
import numpy as np


def f(x):
    """偏微分したい関数: f(x) = x0^2 + x1^2"""
    return x[0] ** 2 + x[1] ** 2


def numerical_diff(f, x, i):
    """中央差分を元に数値微分する関数 (偏微分)

    :param function f: 偏微分する関数
    :param numpy.ndarray x: 偏微分する引数
    :param int i: 偏微分する変数のインデックス
    """
    # 丸め誤差で無視されない程度に小さな値を用意する
    h = 1e-4
    # 偏微分する変数のインデックスにだけ上記の値を入れる
    h_vec = np.zeros_like(x)
    h_vec[i] = h
    # 数値微分を使って偏微分する
    return (f(x + h_vec) - f(x - h_vec)) / (2 * h)


def numerical_gradient(f, x):
    """勾配を計算する関数

    勾配というのは、全ての変数について偏微分した結果をベクトルとしてまとめたものを言う。
    """
    # 勾配を入れるベクトルをゼロで初期化する
    grad = np.zeros_like(x)

    for i, _ in enumerate(x):
        # i 番目の変数で偏微分する
        grad[i] = numerical_diff(f, x, i)

    # 計算した勾配を返す
    return grad


def gradient_descent(f, initial_position, learning_rate=0.1, steps=50):
    """勾配法で最小値を求める関数

    :param function f: 最小値を見つけたい関数
    :param numpy.ndarray initial_position: 関数の初期位置
    :param float learning_rate: 学習率
    :param int steps: 学習回数
    """
    # 現在地を示すカーソル
    x = initial_position

    # 学習を繰り返す
    for _ in range(steps):
        # 現在地の勾配 (どちらにどれだけ進むべきか) を得る
        grad = numerical_gradient(f, x)
        # 勾配を元にして現在地を移動する
        x -= learning_rate * grad

    # 最終的な位置を返す
    return x


def main():
    # 勾配法を使って関数 f() の最小値を探す (初期位置は 1, 2)
    min_pos = gradient_descent(f, [1, 2])
    print('勾配法が見つけた最小値: {0}'.format(min_pos))


if __name__ == '__main__':
    main()

上記を実行した結果は次の通り。

勾配法が見つけた最小値: [  1.78405962e-05   3.56811923e-05]

最小値である  x_0 = 0, x_1 = 0 に、どちらの次元についても近づけていることが分かる。

最小値に収束していく様子を可視化してみる

先ほどの結果から、最小値に収束させることができることは分かった。 次は、どのように収束していっているのかをグラフで可視化してみよう。

次のサンプルコードでは各ステップごとの  x_0 x_1 の位置を折れ線グラフで示している。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from matplotlib import pyplot as plt
import numpy as np


def f(x):
    """偏微分したい関数: f(x) = x0^2 + x1^2"""
    return x[0] ** 2 + x[1] ** 2


def numerical_diff(f, x, i):
    """中央差分を元に数値微分する関数 (偏微分)

    :param function f: 偏微分する関数
    :param numpy.ndarray x: 偏微分する引数
    :param int i: 偏微分する変数のインデックス
    """
    # 丸め誤差で無視されない程度に小さな値を用意する
    h = 1e-4
    # 偏微分する変数のインデックスにだけ上記の値を入れる
    h_vec = np.zeros_like(x)
    h_vec[i] = h
    # 数値微分を使って偏微分する
    return (f(x + h_vec) - f(x - h_vec)) / (2 * h)


def numerical_gradient(f, x):
    """勾配を計算する関数

    勾配というのは、全ての変数について偏微分した結果をベクトルとしてまとめたものを言う。
    """
    # 勾配を入れるベクトルをゼロで初期化する
    grad = np.zeros_like(x)

    for i, _ in enumerate(x):
        # i 番目の変数で偏微分する
        grad[i] = numerical_diff(f, x, i)

    # 計算した勾配を返す
    return grad


def gradient_descent(f, initial_position, learning_rate=0.1):
    """勾配法で最小値を求める関数

    :param function f: 最小値を見つけたい関数
    :param numpy.ndarray initial_position: 関数の初期位置
    :param float learning_rate: 学習率
    """
    # 現在地を示すカーソル
    x = initial_position

    # 学習を繰り返す
    while True:
        # 現在地の勾配 (どちらにどれだけ進むべきか) を得る
        grad = numerical_gradient(f, x)
        # 勾配を元にして現在地を移動する
        x -= learning_rate * grad
        # 現在の位置を返す
        yield np.copy(x)


def main():
    g = gradient_descent(f, [1, 2])

    x = range(50)
    y = np.array([next(g) for _ in x])

    plt.plot(x, y[:, 0], label='f_x0(x)')
    plt.plot(x, y[:, 1], label='f_x1(x)')
    plt.legend()
    plt.show()


if __name__ == '__main__':
    main()

上記の実行結果は次の通り。 f:id:momijiame:20161217195558p:plain それぞれの軸で最小値に収束していく様子が見て取れる。

まとめ

  • 勾配法は多変数の関数でも使える
  • ただし傾きを得るときの計算が偏微分になる
  • 偏微分は解析的に計算する必要はなく数値解法での近似値が使える
  • 各変数について偏微分した結果のベクトルを勾配という
  • 勾配を元に各次元ごとに最小値を探すことができる

参考文献

ゼロから作るDeep Learning ―Pythonで学ぶディープラーニングの理論と実装

ゼロから作るDeep Learning ―Pythonで学ぶディープラーニングの理論と実装

Ruby: bundler のグローバル設定にデフォルト値を保存する

Bundler は Ruby のパッケージ管理 (vendoring) をするためのパッケージ。 このブログでも、以前 Ruby の開発環境を整える記事の中で取り扱ったことがある。

blog.amedama.jp

ただ、こいつはデフォルトだとパッケージをシステム環境にインストールしようとする。 もし異なる場所 (ディレクトリ) にインストールしたいときは --path オプションで指定しなきゃいけない。

なので、これまでは次のようなエイリアスをシェルの設定に追加して使っていた。

alias bi="bundle install --path=vendor/bundle"

こうすると Gemfile のあるディレクトリで bi と入力すると上記のコマンドが実行される。

$ bi

ただ、上記みたいなエイリアスを貼らなくても楽をする方法があるよ、というのが今回の趣旨。 具体的には、グローバルの設定にインストール先のパスをあらかじめ指定しておく。

今回使った環境は次の通り。

$ sw_vers 
ProductName:    Mac OS X
ProductVersion: 10.11.6
BuildVersion:   15G1108
$ ruby --version
ruby 2.3.1p112 (2016-04-26 revision 54768) [x86_64-darwin15]
$ bundler --version
Bundler version 1.13.6

下準備

まずは gem コマンドを使ってシステム環境に Bundler をインストールする。

$ gem install bundler

続いて適当な作業ディレクトリで init サブコマンドを実行する。

$ bundle init

するとプロジェクトで使用するパッケージを羅列する Gemfile というファイルができる。

$ cat Gemfile
# frozen_string_literal: true
source "https://rubygems.org"

# gem "rails"

ここに、ひとまずテスト用として sqlite3 を追加しておこう。 これは別の gem でも構わない。

$ echo "gem 'sqlite3'" >> Gemfile
$ cat Gemfile
# frozen_string_literal: true
source "https://rubygems.org"

# gem "rails"
gem 'sqlite3'

通常の使い方

まずは、いつも通りの使い方から。 install サブコマンドに --path オプションを指定して実行しよう。

$ bundle install --path=vendor/bundle
Fetching gem metadata from https://rubygems.org/...........
Fetching version metadata from https://rubygems.org/.
Resolving dependencies...
Installing sqlite3 1.3.12 with native extensions
Using bundler 1.13.6
Bundle complete! 1 Gemfile dependency, 2 gems now installed.
Bundled gems are installed into ./vendor/bundle.

すると、オプションで指定した vendor/bundle ディレクトリ以下に gem がインストールされる。

$ ls vendor
bundle

実は、上記を実行すると次のように .bundle/config という名前でプロジェクトの設定ファイルが保存されている。

$ cat .bundle/config
---
BUNDLE_PATH: "vendor/bundle"
BUNDLE_DISABLE_SHARED_GEMS: "true"

使われる内容は config サブコマンドで確認することもできる。

$ bundler config
Settings are listed in order of priority. The top value will be used.
path
Set for your local app (/Users/amedama/Documents/temporary/sample/.bundle/config): "vendor/bundle"

disable_shared_gems
Set for your local app (/Users/amedama/Documents/temporary/sample/.bundle/config): "true"

上記の確認ができたら、一旦設定とインストールされたディレクトリを削除しておこう。

$ rm -rf vendor .bundle

グローバル設定にデフォルト値を保存する

では、次に上記のような --path オプションを毎回指定しなくても良い方法について。

これには config サブコマンドで --global に対して path の設定を入れよう。

$ bundler config --global path vendor/bundle

すると、ユーザのホームディレクトリに Bundler の設定ファイルができる。

$ bundler config
Settings are listed in order of priority. The top value will be used.
path
Set for the current user (/Users/amedama/.bundle/config): "vendor/bundle"

デフォルト値が設定できたところで --path ディレクトリを指定せずに install サブコマンドを実行してみよう。

$ bundler install

すると --path でディレクトリを指定しなくても vendor/bundle ディレクトリにインストールされた。

$ ls vendor
bundle

これで毎回 --path オプションを指定する必要がなくなった。

グローバル設定をオーバーライドする

ちなみに、グローバル設定はプロジェクトのローカル設定でオーバーライドすることもできる。

試しに --local に対して path を別の値で指定してみよう。

$ bundler config --local path foo/bar
You are replacing the current local value of path, which is currently nil

すると、グローバル設定とローカル設定の両方がコンフィグとして見えるようになる。

$ bundler config
Settings are listed in order of priority. The top value will be used.
path
Set for your local app (/Users/amedama/Documents/temporary/sample/.bundle/config): "foo/bar"
Set for the current user (/Users/amedama/.bundle/config): "vendor/bundle"

disable_shared_gems
Set for your local app (/Users/amedama/Documents/temporary/sample/.bundle/config): "true"

この状態で install コマンドを実行してみよう。

$ bundler install

するとローカル設定で指定したディレクトリに gem がインストールされている。

$ ls foo
bar

このように、ローカル設定でグローバル設定をオーバーライドできることが分かった。

まとめ

  • Bundler で Ruby gems のパッケージ管理 (vendoring) ができる
  • ただ、毎回 --path オプションを指定するのがめんどい
  • そんなときはグローバル設定にデフォルト値を設定すると楽になる

Python: 勾配法で関数の最小値を探す

Mac OS X Python 機械学習

勾配法はニューラルネットワークなどの機械学習アルゴリズムの中で、学習するときに使われているアルゴリズム。 このアルゴリズムを使うと、与えられた関数の最小値 (または最大値) を探すことができる。 例えば教師データと現在の出力の誤差を計算する損失関数を与えれば、その最小値を探すことで教師データに出力を近づけることができる。 ただし、真の最小値 (または最大値) が見つかることが保証されているわけではなく、局所的な最適解に陥る恐れはある。

今回は Python を使って単純な勾配法を実装してみることにする。 使った環境は次の通り。

$ sw_vers 
ProductName:    Mac OS X
ProductVersion: 10.11.6
BuildVersion:   15G1108
$ python --version
Python 3.5.2

パッケージのインストール

まずは可視化のための matplotlib と、数値計算系をするならこれがないとお話にならない numpy をインストールしておく。

$ pip install matplotlib numpy

題材にする関数

ひとまず、今回は  f(x) = x^{2} な関数を題材にしよう。 この関数の最小値を勾配法を使って探してみることにする。

まずは最小値を求めたい関数をグラフに図示してみることにする。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import numpy as np
from matplotlib import pyplot as plt


def f(x):
    """最小値を求めたい関数"""
    return x ** 2


def main():
    x = np.arange(-2, 2, 0.1)

    y = f(x)
    plt.plot(x, y, label='f(x)')

    plt.legend()
    plt.show()


if __name__ == '__main__':
    main()

上記のプログラムを実行すると、次のようなグラフが得られる。 f:id:momijiame:20161210155342p:plain

まあ、どう見ても最小値は  f(0) のときなことが分かる。

一階導関数を求める

先ほどはグラフから最小値をあっさりと判断できたけど、それは可視化した上で人間が判断しているため。 コンピュータに最小値を探させるには、別の方法が必要になる。 それに、今回は変数がひとつだけなので良いものの、みっつ以上に増えれば可視化も難しくなる。

そこで登場するのが今回の勾配法なわけだけど、これにはまず最小値を求めたい関数を微分する必要がある。 ちなみに、このやり方であれば人間の判断は必要なくなって、変数が増えたときにも偏微分で対応できる。

ではまず、先ほどの関数  f(x) = x^{2} を微分したものをグラフにしてみよう。 これは  f'(x) = 2x ということが分かる。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import numpy as np
from matplotlib import pyplot as plt


def f(x):
    """最小値を求めたい関数"""
    return x ** 2


def df(x):
    """最小値を求めたい関数の一階導関数"""
    return 2 * x


def main():
    x = np.arange(-2, 2, 0.1)

    y = f(x)
    plt.plot(x, y, label='f(x)')

    y_dash = df(x)
    plt.plot(x, y_dash, label='f\'(x)')

    plt.legend()
    plt.show()


if __name__ == '__main__':
    main()

最終的には微分までコンピュータにやらせるけど、ここではあえて人間が微分している。

上記を実行すると、次のようなグラフが得られる。 f:id:momijiame:20161210160243p:plain

青色の線が最小値を求めたい関数で、緑色の線が一階導関数になっている。 最小値である  f(0) を境にして、一階導関数の出力がプラスとマイナスに切り替わっていることが見て取れる。

f:id:momijiame:20161210161205p:plain

勾配法では、この最小値を境にして一階導関数のプラスとマイナスが切り替わるところが重要になる。

一階導関数の符号を反転してみると?

先ほどの一階導関数をそのままプロットしたグラフでは、最小値よりも大きい範囲ではプラスに、小さい範囲ではマイナスになっていた。 これの符号を反転してみると、どうなるだろうか?

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import numpy as np
from matplotlib import pyplot as plt


def f(x):
    """最小値を求めたい関数"""
    return x ** 2


def df(x):
    """最小値を求めたい関数の一階導関数"""
    return 2 * x


def main():
    x = np.arange(-2, 2, 0.1)

    y = f(x)
    plt.plot(x, y, label='f(x)')

    y_dash = df(x)
    # 一階導関数の符号を反転させる
    plt.plot(x, -y_dash, label='-f\'(x)')

    plt.legend()
    plt.show()


if __name__ == '__main__':
    main()

上記を実行すると、次のようなグラフが得られる。 (グラフでは凡例にマイナス符号を付けるのを忘れていた)

f:id:momijiame:20161210161748p:plain

今度のグラフでは一階導関数が最小値を境にして大きい範囲ではマイナス、小さい範囲ではプラスになっている。 すると、符号を反転した一階導関数の出力は、最小値に近づけるために必要な移動方向と一致することが分かる。

f:id:momijiame:20161210162141p:plain

つまり、最小値は符号を反転した一階導関数を出力が指し示す方向にあるということになる。 あとは、その指し示す方向を頼りに徐々に移動していけば、いつかは最小値にたどり着ける。

とはいえ、もちろんたどり着いた先が局所的な最適解で、実際の最小値が別にある可能性はある。 これはようするに、関数の出力がぐねぐねしていて、ところどころに凹みがあると、そこにハマってしまうということ。

勾配法

ここで勾配法のアルゴリズムを数式で書いてみよう。

 x_{i + 1} = x_i - \eta f'(x_i)

上記で現在の位置を  x_i としたとき、より最小値に近づいたものが  x_{i + 1} になる。  \eta は学習率と呼ばれるもので、どれくらいの勢いで最小値に近づいていくかを表している。 そして  f'(x_i) は最小値を探したい関数の一階導関数になっている。

勾配法では、上記の作業を複数回繰り返すことで最小値を探す。 次のサンプルコードでは gradient_descent() という関数で勾配法を実装している。 learning_rate が学習率  \eta に対応していて steps は上記の工程を何回繰り返すかを表す。 初期値は 1 に設定した上で最小値をそこから探していく。

#!/usr/bin/env python
# -*- coding: utf-8 -*-


def f(x):
    """最小値を求めたい関数"""
    return x ** 2


def df(x):
    """最小値を求めたい関数の一階導関数"""
    return 2 * x


def gradient_descent(df, initial, learning_rate=0.1, steps=50):
    """一階導関数から勾配法で最小値を求める関数"""
    # 現在地を示すカーソル
    x = initial

    # 所定の回数を繰り返す
    for _ in range(steps):
        # 現在地の勾配 (どちらにどれだけ進むべきか) を得る
        grad = df(x)
        # 勾配を元にして現在地を移動する
        x -= learning_rate * grad

    # 最終的な位置を返す
    return x


def main():
    print(gradient_descent(df, 1))


if __name__ == '__main__':
    main()

上記を実行すると次のような出力が得られる。

1.4272476927059604e-05

値は  1.4272476927059604 * 10^{-5} なので 0 に近いことが分かる。

勾配法の learning_ratesteps はハイパーパラメータと呼ばれていて人間が適切に選ぶ必要がある。 これは大きくても小さくても上手くいかない。 例えば learning_rate が大きすぎれば移動する量が大きすぎていつまでも収束せず、小さすぎれば最小値になかなか近づかない。 また steps は大きすぎれば計算量が大きくなりすぎるし、小さすぎれば最小値に近づくまでに終了してしまう。

最小値に収束していく様子を可視化してみる

先ほどは最小値に収束した結果だけを出力にしたけど、次は収束していく様子を可視化してみる。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from matplotlib import pyplot as plt


def f(x):
    """最小値を求めたい関数"""
    return x ** 2


def df(x):
    """最小値を求めたい関数の一階導関数"""
    return 2 * x


def gradient_descent(df, initial, learning_rate=0.1):
    """一階導関数から勾配法で最小値を求める関数"""
    # 現在地を示すカーソル
    x = initial

    while True:
        # 現在地の勾配 (どちらにどれだけ進むべきか) を得る
        grad = df(x)
        # 勾配を元にして現在地を移動する
        x -= learning_rate * grad

        yield x


def main():
    g = gradient_descent(df, 1)

    x = range(50)
    y = [next(g) for _ in x]

    plt.plot(x, y)
    plt.show()


if __name__ == '__main__':
    main()

上記を実行すると、次のようなグラフが得られる。

f:id:momijiame:20161210165835p:plain

工程を繰り返すごとに、初期値である 1 付近から、徐々に最小値である 0 に近づいていくことが見て取れる。

微分をコンピュータにやらせる

ここまでで勾配法を使って最小値を探すことができることは分かった。 しかし、先ほどの例ではまだ人間が最小値を探したい関数を微分していた。 これをどうにかしないことには機械学習には使えない。

そこで、まずは微分の定義から見直してみよう。

 \begin{eqnarray}f'(x) = \frac{ df(x) }{ dx } = \lim_{ h \to 0 } \frac{ f(x + h) - f(x) }{ h }\end{eqnarray}

上記は  f(x) に、限りなくゼロに近づけた小さな値  h を加えたときの変化量 (傾き) を得る、という意味になっている。 ただし、これをそのまま実世界のコンピュータに適用することはできない。

まず、限りなくゼロに近づけた値は IEEE754 の小数を実装したコンピュータでは正しく扱うことができない。 まず、あまりにも小さい値は丸め誤差で無視されてしまう。

>>> 1e-100
1e-100
>>> 1.0 + 1e-100
1.0

そのため、定義どおりに限りなくゼロに近づけたような値ではなく、丸め誤差で無視されない程度に小さな値を使うことになる。

さらに、限りなくゼロに近づけた値が使えないために生じる問題がある。 丸め誤差で無視されない程度に小さな値  h f(x) に加えて傾きを求めるとしよう。 すると、実際に傾きを求めたい場所  x からは  h だけ位置ズレてしまう。

そこで、上記の問題を解決するために中央差分というものを使って変化量を求める。 これは、求めたい場所  x の前後  \pm h の差分を使って傾きを求めるというもの。 これなら傾きを求める場所のズレは前後  \pm h で打ち消されて  x そのものになる。

試しに、上記で説明した中央差分を使った微分で一階導関数を求めてみよう。 次のサンプルコードでは numerical_diff() という関数で中央差分を元にした微分を実装している。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import numpy as np
from matplotlib import pyplot as plt


def f(x):
    """最小値を求めたい関数"""
    return x ** 2


def numerical_diff(f, x):
    """中央差分を元に数値微分する関数"""
    h = 1e-4
    return (f(x + h) - f(x - h)) / (2 * h)


def main():
    x = np.arange(-2, 2, 0.1)

    y = f(x)
    plt.plot(x, y, label='f(x)')

    y_dash = numerical_diff(f, x)
    plt.plot(x, y_dash, label='f\'(x)')

    plt.legend()
    plt.show()


if __name__ == '__main__':
    main()

ちなみに、先ほどのように人間が数式から手計算で微分することを解析的な微分と呼ぶ。 それに対し、上記サンプルコードのようなコンピュータを使った近似的な微分を数値微分という。

上記を実行すると次のグラフが得られる。

f:id:momijiame:20161210173103p:plain

グラフで見る限り解析的な微分したときと同じ結果が得られている。

数値微分を勾配法に組み込んでみる

今回の記事の最終形態として、先ほどの勾配法に数値微分を組み込んでみよう。

#!/usr/bin/env python
# -*- coding: utf-8 -*-


def f(x):
    """最小値を求めたい関数"""
    return x ** 2


def numerical_diff(f, x):
    """中央差分を元に数値微分する関数"""
    h = 1e-4
    return (f(x + h) - f(x - h)) / (2 * h)


def gradient_descent(f, initial, learning_rate=0.1, steps=50):
    """一階導関数から勾配法で最小値を求める関数"""
    # 現在地を示すカーソル
    x = initial

    # 所定の回数を繰り返す
    for _ in range(steps):
        # 現在地の勾配 (どちらにどれだけ進むべきか) を得る
        grad = numerical_diff(f, x)
        # 勾配を元にして現在地を移動する
        x -= learning_rate * grad

    # 最終的な位置を返す
    return x


def main():
    print(gradient_descent(f, 1))


if __name__ == '__main__':
    main()

上記の実行結果は次の通り。

1.4272476927065283e-05

解析的な微分を使っていたときとわずかに値は異なるものの、ほぼ同じ値が得られている。

まとめ

  • 勾配法を使って関数の最小値を探すことができる
  • 勾配法では符号を反転させた関数の一階導関数を元に最小値を探す
  • 一階導関数はコンピュータで近似値を計算できる

参考文献

ゼロから作るDeep Learning ―Pythonで学ぶディープラーニングの理論と実装

ゼロから作るDeep Learning ―Pythonで学ぶディープラーニングの理論と実装

Python: fasteners の便利な排他ロックを試す

Mac OS X Python

Python には標準ライブラリとして、いくつか排他ロックの実装が用意されている。 例えば threading モジュールの Lock オブジェクトなどは、その代表といえる。

しかしながら、標準では用意されていないものもある。 例えばプロセス間の排他ロックやリードライトロックは標準ライブラリに用意されていない。 そのため、例えばもし自前でプロセス間の排他ロックを用意するとしたら fcntl モジュールなどを使って書くことになる。

とはいえ、排他ロックの実装というのはバグを作り込みやすい。 そもそも、マルチスレッドやマルチプロセスで競合しないプログラムを作ること自体が深く注意を払わなければできない作業だ。 その上、排他ロックの機構まで自前で用意するとしたら、二重でリスクを負うことになる。

fasteners

前置きが長くなったけど fasteners は前述したような排他ロックの機構を提供してくれるパッケージ。 今回は、このパッケージが用意している排他ロックを使ってみる。

公式ドキュメントは以下にある。

Welcome to Fasteners’s documentation! — Fasteners 0.14.1 documentation

使った環境は次の通り。

$ sw_vers
ProductName:    Mac OS X
ProductVersion: 10.11.6
BuildVersion:   15G1108
$ python --version
Python 3.5.2

インストール

fasteners はサードパーティ製のパッケージなので pip コマンドを使ってインストールする。

$ pip install fasteners

プロセス間の排他ロック

まず最初にプロセス間の排他ロックから。 これは、複数のプロセスが何らかのリソースを共有するシチュエーションで用いることになる。 複数のプロセスから共有リソースにアクセスを試みたときに、実際にアクセスすることができるプロセスは一つだけに制限するのに使う。 排他ロックを取得できたプロセスだけが、共有リソースにアクセスする権利を持つということ。

ここから始める検証では、テストのためにターミナルで Python の REPL をふたつ開いて実施しよう。 Python の REPL がふたつ、ということはそれぞれにプロセスが立ち上がるのでプロセス間の排他ロックの検証ができる。 ちなみに、ここで紹介するのはあくまでもマルチプロセスでの排他ロックなので、マルチスレッドでの排他ロックには使ってはいけない。

まず、ひとつ目の REPL で fasteners パッケージをインポートしよう。

>>> import fasteners

そして、プロセス間の排他ロックを司る InterProcessLock クラスをインスタンス化する。

>>> lock = fasteners.InterProcessLock('/var/tmp/lockfile')

このとき渡しているのは、ロックに用いるファイルを作るためのパスになっている。 つまり、各プロセスはこのロックファイルの状態を調べることで、別のプロセスが既にロックを取得していないかを確認し合うことになる。

そして、実際にロックをかけるには InterProcessLock#acquire() メソッドを呼び出す。 このメソッドはロックを取得できたか否かを真偽値で返してくる。

>>> lock.acquire()
True

上記では True が返っているので正常にロックが取れたことを意味している。

それでは、続いて別のターミナルで REPL を開いて同じようにロックの取得を試みてみよう。

>>> import fasteners
>>> lock = fasteners.InterProcessLock('/var/tmp/lockfile')

ふたつ目の REPL でもロックを取ろうとすると、そこでブロックする。 これは、ひとつ目の REPL が既にロックを取っているためだ。

>>> lock.acquire()
...

この状態で、ひとつ目の REPL でロックを開放してみよう。 ロックを開放するには InterProcessLock#release() メソッドを呼び出す。

>>> lock.release()

すると、ふたつ目の REPL でブロックしていた InterProcessLock#acquire() メソッドが返ってくるはず。

>>> lock.acquire()
True

返り値として True が返ってきていて、これでふたつ目の REPL にロックが移ったことが分かる。

後始末として、ひとまず、ふたつ目の REPL もロックを開放しておこう。

>>> lock.release()

ちなみに、ロックで怖いことの一つが誰かがロックを握ったまま手放さなくなることだ。 fasteners は内部的に fcntl モジュールを使って、そこらへんをケアしているので大丈夫そう。 試しに Python のプロセスを kill -KILL コマンドなどで強制終了してもロックはちゃんと開放される。

ブロックさせたくないとき

先ほどは別のプロセスがロックを取っている間は、別のプロセスがロックを取ろうとするとブロックしていた。 とはいえ、ロックを取ろうと試みて、もしだめなら別の仕事をするなりそのまま終了したいというような状況もあるはず。

そんなときは InterProcessLock#acquire() メソッドの引数である blocking に False を指定しよう。 こうすると、もしロックが取れなかったときは即座にリターンされる。

まず、ひとつ目の REPL でロックを取る。

>>> lock.acquire(blocking=False)
True

そして、ふたつ目の REPL でロックを取ろうと試みる。 このとき blocking に False を指定していると即座にリターンされる。 そして、返り値はロックが取れなかったことを表す False になっている。

>>> lock.acquire(blocking=False)
False

後片付けとしてロックを開放しておくのは忘れずに。

>>> lock.release()

コンテキストマネージャとして使う

ちなみに、ここまで使ってきた InterProcessLock クラスのインスタンスはコンテキストマネージャとしても動作する。 こうすると、コンテキストマネージャのブロック内がひとつのプロセスだけで実行されるようになる。 ただし、コンテキストマネージャとして使うときは blocking パラメータを渡すことはできないようだ。

>>> with fasteners.InterProcessLock('/var/tmp/lockfile'):
...     # do something
...     pass
...

この方がロックの開放漏れが起きづらくて良い。

上記のように使わないときは、必ずロックが開放されるように次のように書く必要があるはず。

>>> lock = fasteners.InterProcessLock('/var/tmp/lockfile')
>>> lock.acquire()
True
>>> try:
...     # do something
...     pass
... finally:
...     lock.release()
... 

デコレータとして使う

また InterProcessLock クラスのインスタンスはデコレータとしても使うことができる。 この場合はデコレータで修飾された関数のブロック内がロックを取得できたひとつのプロセスだけで実行されることになる。

>>> @fasteners.interprocess_locked('/var/tmp/lockfile')
... def do_something():
...     # do something
...     pass
...

ブロックを抜けるときは自動的にロックが開放されるので、これもロックの開放漏れが起こりづらいだろう。

リードライトロック

fasteners の、もうひとつの便利な排他ロックがリードライトロックだ。

通常の排他ロックの場合、実施する操作が読み込みだろうと書き込みだろうと関係ない。 ただ単に、排他ロックを取得できたスレッドないしプロセスだけが共有リソースにアクセスする権利を得る。 これは先ほど紹介したプロセス間の排他ロックも同様だった。

しかし、実際には読み込みだけは並列にアクセスできるようにしたい場面も多い。 なぜなら、読み込み操作は副作用のない処理だから。 副作用がなければ、複数のスレッドないしプロセスが並列にアクセスしても共有リソースが不整合を起こす心配はない。

そのため、リードライトロックでは、読み込みと書き込みが非対称な関係になっている。 読み取りについては複数のスレッドないしプロセスから並列でアクセスできる。 しかし、書き込みについてはひとつのスレッドないしプロセスだけで排他的に処理されるようになっている。

ちなみに、リードライトロック自体はスレッドやプロセスに関係のない概念となっている。 しかしながら fasteners のリードライトロックはスレッドにのみ対応している点には留意が必要だ。

また、リードライトロックを Python の標準ライブラリに追加しようという動きもあるようだ。 とはいえ、チケットを見る限り当初は Python 3.4 を目指して作業されていたものの、最近は進展が見られないようだ。

Issue 8800: add threading.RWLock - Python tracker

前置きが長くなったけど次に fasteners のリードライトロックを使ったサンプルプログラムを示す。 具体的にはリードライトロックを司る ReaderWriterLock クラスのインスタンスを使う。 ReaderWriterLock#read_lock() で読み込みロックを、ReaderWriterLock#write_lock() で書き込みロックを取得できる。 前述したように ReaderWriterLock#read_lock() は同時に複数のスレッドが取得できる。 それに対し ReaderWriterLock#write_lock() はひとつのスレッドだけが取得できる。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import time
import threading
import random

import fasteners


def reader(lock):
    # 読み込みロックをかける (読み込みだけは同時にできる)
    with lock.read_lock():
        thread_id = threading.get_ident()
        print('Thread {thread_id} is reading'.format(thread_id=thread_id))
        time.sleep(2)


def writer(lock):
    # 書き込みロックをかける (他の読み書きは排他制御される)
    with lock.write_lock():
        thread_id = threading.get_ident()
        print('Thread {thread_id} is writing'.format(thread_id=thread_id))
        time.sleep(4)


def main():
    lock = fasteners.ReaderWriterLock()

    # 読み込みスレッドを 6 つ用意する
    read_threads = [threading.Thread(target=reader, args=(lock,))
                    for _ in range(6)]
    # 書き込みスレッドを 4 つ用意する
    write_threads = [threading.Thread(target=writer, args=(lock,))
                     for _ in range(4)]

    # 一つにまとめてシャッフルする
    threads = read_threads + write_threads
    random.shuffle(threads)

    try:
        # スレッドを起動する
        for t in threads:
            t.start()
    finally:
        # 全てのスレッドが処理を終えるまで待つ
        for t in threads:
            t.join()


if __name__ == '__main__':
    main()

読み込みスレッドを 6 つ、書き込みスレッドを 4 つ用意して起動される順番をシャッフルした上で実行している。

上記を実行した結果の一例がこちら。 書き込みスレッドの処理はひとつずつ実行されるが、読み込みスレッドがロックを取得すると別の読み込みスレッドも続々とロックを取得して処理し始めることが分かる。

$ python rwlock.py
Thread 123145307557888 is writing
Thread 123145328578560 is reading
Thread 123145318068224 is reading
Thread 123145333833728 is reading
Thread 123145312813056 is reading
Thread 123145323323392 is reading
Thread 123145339088896 is reading
Thread 123145344344064 is writing
Thread 123145349599232 is writing
Thread 123145354854400 is writing

別のパターン。 こちらは最初にロックを取得したのが読み込みスレッドだったので、読み込みスレッドの処理が最初にいっぺんに実行されている。

$ python rwlock.py
Thread 123145307557888 is reading
Thread 123145312813056 is reading
Thread 123145323323392 is reading
Thread 123145328578560 is reading
Thread 123145333833728 is reading
Thread 123145349599232 is reading
Thread 123145318068224 is writing
Thread 123145339088896 is writing
Thread 123145344344064 is writing
Thread 123145354854400 is writing

ロックデコレータ

ここからは「そんなに便利か?」という感じではあるけど fasteners にあるロックデコレータの機能を紹介していく。

まずは @fasteners.locked デコレータを使うと、そのメソッドが排他に処理されるというもの。 これには、まずクラスの中で self._lock という名前でロックオブジェクトを用意しておく必要がある。 デコレータはメソッドからインスタンスのロックオブジェクトを取得して自動的にロックをかけてくれる。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import threading
import time

import fasteners


class SharedAmongThreads(object):
    '''スレッド間で共有されるオブジェクト'''

    def __init__(self):
        self._lock = threading.Lock()

    @fasteners.locked
    def have_to_thread_safe(self):
        '''マルチスレッドで呼び出されるためスレッドセーフである必要があるメソッド'''
        # do something
        thread_id = threading.get_ident()
        print('Thread {thread_id} is doing'.format(thread_id=thread_id))
        time.sleep(1)


def main():
    # スレッド間で共有するインスタンスを作る
    obj = SharedAmongThreads()

    # スレッド群を用意する
    threads = [threading.Thread(target=obj.have_to_thread_safe)
               for _ in range(5)]

    try:
        # スレッドを起動する
        for t in threads:
            t.start()
    finally:
        # 全てのスレッドの処理が終わるまで待つ
        for t in threads:
            t.join()


if __name__ == '__main__':
    main()

上記を実行した結果がこちら。 実際に実行してみると一秒ごとに処理が進んでいく様子が見て取れる。

$ python lockdeco.py
Thread 123145307557888 is doing
Thread 123145312813056 is doing
Thread 123145318068224 is doing
Thread 123145323323392 is doing
Thread 123145328578560 is doing

まあ、とはいえ fasteners の機能を使わなくてもコンテキストマネージャを使えば十分可読性は確保できそうな気がする。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import threading
import time


class SharedAmongThreads(object):
    '''スレッド間で共有されるオブジェクト'''

    def __init__(self):
        self._lock = threading.Lock()

    def have_to_thread_safe(self):
        '''マルチスレッドで呼び出されるためスレッドセーフである必要があるメソッド'''
        with self._lock:
            # do something
            thread_id = threading.get_ident()
            print('Thread {thread_id} is doing'.format(thread_id=thread_id))
            time.sleep(1)


def main():
    # スレッド間で共有するインスタンスを作る
    obj = SharedAmongThreads()

    # スレッド群を用意する
    threads = [threading.Thread(target=obj.have_to_thread_safe)
               for _ in range(5)]

    try:
        # スレッドを起動する
        for t in threads:
            t.start()
    finally:
        # 全てのスレッドの処理が終わるまで待つ
        for t in threads:
            t.join()


if __name__ == '__main__':
    main()

同様に、リストの中にロックオブジェクトを複数入れておくと @fasteners.locked() デコレータで同時にロックを取得してくれる。 ロックオブジェクトに使うメンバの名前が _lock 以外のときはデコレータの lock 引数でメンバ名を指定する。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import threading
import time

import fasteners


class SharedAmongThreads(object):
    '''スレッド間で共有されるオブジェクト'''

    def __init__(self):
        self._locks = [threading.Lock(), threading.Lock()]

    @fasteners.locked(lock='_locks')
    def have_to_thread_safe(self):
        '''マルチスレッドで呼び出されるためスレッドセーフである必要があるメソッド'''
        # do something
        thread_id = threading.get_ident()
        print('Thread {thread_id} is doing'.format(thread_id=thread_id))
        time.sleep(1)


def main():
    # スレッド間で共有するインスタンスを作る
    obj = SharedAmongThreads()

    # スレッド群を用意する
    threads = [threading.Thread(target=obj.have_to_thread_safe)
               for _ in range(5)]

    try:
        # スレッドを起動する
        for t in threads:
            t.start()
    finally:
        # 全てのスレッドの処理が終わるまで待つ
        for t in threads:
            t.join()


if __name__ == '__main__':
    main()

まあ、とはいえ上記も最近の Python ならコンテキストマネージャを横に並べるだけで事足りる。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import threading
import time


class SharedAmongThreads(object):
    '''スレッド間で共有されるオブジェクト'''

    def __init__(self):
        self._locks = [threading.Lock(), threading.Lock()]

    def have_to_thread_safe(self):
        '''マルチスレッドで呼び出されるためスレッドセーフである必要があるメソッド'''
        with self._locks[0], self._locks[1]:
            # do something
            thread_id = threading.get_ident()
            print('Thread {thread_id} is doing'.format(thread_id=thread_id))
            time.sleep(1)


def main():
    # スレッド間で共有するインスタンスを作る
    obj = SharedAmongThreads()

    # スレッド群を用意する
    threads = [threading.Thread(target=obj.have_to_thread_safe)
               for _ in range(5)]

    try:
        # スレッドを起動する
        for t in threads:
            t.start()
    finally:
        # 全てのスレッドの処理が終わるまで待つ
        for t in threads:
            t.join()


if __name__ == '__main__':
    main()

まとめ

  • fasteners を使うとプロセス間の排他ロックやリードライトロックといった便利な排他ロックが使える

Python: freezegun で時刻のテストを楽に書く

Mac OS X Python

時刻周りの処理はバグが混入しやすい上にテストが書きづらくて面倒くさい。 今回は、そんな面倒な時刻のテストを楽に書けるようになる freezegun というパッケージを使ってみる。 この freezegun というパッケージを使うと Python の標準ライブラリの datetime から得られる現在時刻を指定したものに差し替えることができる。

使った環境は次の通り。

$ sw_vers
ProductName:    Mac OS X
ProductVersion: 10.11.6
BuildVersion:   15G1108
$ python --version
Python 3.5.2

時刻をテストするときの面倒くささ

時刻周りの処理をテストをするときは、当然ながら色々な時刻を使ってテストがしたい。 とはいえ、そのためだけにシステムの時刻を変更しながらテストを走らせるわけにもいかないだろう。 そこで、大抵の場合は時刻を取得するところをモックに置き換えてテストすることになるはず。 ただ、Python の datetime はモックアウトが意外とやりにくい。

これは何故かというと datetime の提供する関数がちょいちょい C 拡張モジュールとして書かれているため。 C 拡張モジュールで書かれていると標準ライブラリの unittest.mock で置き換えることができない。

例えば、次のように now() 関数をピンポイントでモックに置き換えようとすると例外になってしまう。

>>> from unittest import mock
>>> with mock.patch('datetime.datetime.now') as now:
...     now.return_value = datetime(2015, 10, 21)
...     from datetime import datetime
...     datetime.now()
...
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/Users/amedama/.pyenv/versions/3.5.2/lib/python3.5/unittest/mock.py", line 1312, in __enter__
    setattr(self.target, self.attribute, new_attr)
TypeError: can't set attributes of built-in/extension type 'datetime.datetime'

そのため datetime オブジェクトを丸ごとモックに置き換える必要がある。

>>> from unittest import mock
>>> import datetime
>>> fake_dt = datetime.datetime(2015, 10, 21)
>>> with mock.patch('datetime.datetime') as dt:
...     dt.now.return_value = fake_dt
...     from datetime import datetime
...     datetime.now()
...
datetime.datetime(2015, 10, 21, 0, 0)

ただ、これだと datetime オブジェクトの別のメソッドを使いたいときなんかに困る。 その都度、書き換えたり書き戻したりが必要になって、うーん…という感じ。

freezegun を使う

そこで freezegun を使うと、そういった手間がかからなくなる。

パッケージのインストールは pip コマンドから。

$ pip install freezegun

特定の時刻を返すようにする

freezegun では freezegun.freeze_time() で datetime モジュールの関数が特定の時刻を返すように置き換えることができる。

例えば以下はその API をデコレータとして使うパターン。 デコレータで修飾された関数の中でのみ時刻が置き換わる。 サンプルコードでは datetime.now() の内容を見ている。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import freezegun
from datetime import datetime


@freezegun.freeze_time('2015-10-21')
def main():
    print(datetime.now())


if __name__ == '__main__':
    main()

上記を実行してみよう。

$ python freeze.py
2015-10-21 00:00:00

見事に取得できる時刻が置き換わっている。 ちなみに、この日付はバック・トゥ・ザ・フューチャーのアレ。

上記では日付までしか指定しなかったけど、もちろん時刻まで入れられる。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import freezegun
from datetime import datetime


def main():
    with freezegun.freeze_time('2015-10-21 12:34:56'):
        print(datetime.now())


if __name__ == '__main__':
    main()

上記の実行結果は次の通り。

$ python freeze.py
2015-10-21 12:34:56

ちなみに、引数には置き換えたい時刻の入った datetime オブジェクトを渡しても良い。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import freezegun
from datetime import datetime


dt = datetime(2015, 10, 21, 12, 34, 56)


@freezegun.freeze_time(dt)
def main():
    print(datetime.now())


if __name__ == '__main__':
    main()

結果については先ほどと同じ。

特定の時刻を返すようにする (コンテキストマネージャ)

先ほどはデコレータを使って時刻を指定した。 同じ API はコンテキストマネージャとしても使うことができる。

以下はコンテキストマネージャとして使ったパターン。 この場合は、コンテキストマネージャのブロック内でだけ時刻が置き換わる。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import freezegun
from datetime import datetime


def main():
    with freezegun.freeze_time('2015-10-21'):
        print(datetime.now())


if __name__ == '__main__':
    main()

実行結果についてはデコレータを使った場合と変わらないので省略する。

特定の時刻を返すようにする (素)

デコレータとしてもコンテキストマネージャとしても使わない場合は、こんな感じ。 とはいえ、まあこれはやらないかな。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import freezegun
from datetime import datetime


def main():
    freezer = freezegun.freeze_time('2015-10-21')
    freezer.start()
    try:
        print(datetime.now())
    finally:
        freezer.stop()


if __name__ == '__main__':
    main()

実行結果は同じ。

特定の時刻から時間をずらしていく

テストをするときは一旦基準となる時刻に規正してから、特定の処理をした後にずらしたいというニーズもあると思う。 freezegun を使えば、それも簡単にできる。

次のサンプルコードでは tick() を使って時刻を timedelta オブジェクトの分だけずらしたり move_to() を使って時刻を切り替えている。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import freezegun
from datetime import datetime
from datetime import timedelta


def main():
    with freezegun.freeze_time('2015-10-21 00:00:00') as freeze_datetime:
        print(datetime.now())

        # 時間を 1 秒進める
        freeze_datetime.tick()
        print(datetime.now())

        # 時間を 1 分進める
        freeze_datetime.tick(delta=timedelta(minutes=1))
        print(datetime.now())

        # 特定の時間に移す
        freeze_datetime.move_to('2000-01-01 00:00:00')
        print(datetime.now())


if __name__ == '__main__':
    main()

上記を実行すると、次のようになる。

$ python freeze.py
2015-10-21 00:00:00
2015-10-21 00:00:01
2015-10-21 00:01:01
2000-01-01 00:00:00

ユニットテストで使う

ここまでで一通りの使い方は紹介できたので、あとはユニットテストに組み込んで使うだけ。 とはいえ、何も難しいことはない。

例えば標準ライブラリの unittest を使ったテストで使うなら、こんな感じ。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import unittest

import freezegun
from datetime import datetime


class Test(unittest.TestCase):

    @freezegun.freeze_time('2015-10-21')
    def test(self):
        self.assertEqual(datetime.now(), datetime(2015, 10, 21))


if __name__ == '__main__':
    unittest.main()

実行してみよう。

$ python test_datetime.py -v
test (__main__.Test) ... ok

----------------------------------------------------------------------
Ran 1 test in 0.006s

OK

ばっちりだね。

まとめ

  • freezegun を使うと datetime オブジェクトから取得できる時刻を指定したものに簡単に置き換えることができる
  • テストを書くときに便利

Ubuntu 16.04 LTS に後から GUI (X Window System) を追加する

Ubuntu16.04LTS X Window System

Ubuntu 16.04 LTS をサーバ版でインストールした後から、やっぱり GUI が欲しいよねってときがある。 今回は、そんなときの対処法について。

使った環境は次の通り。

$ cat /etc/lsb-release
DISTRIB_ID=Ubuntu
DISTRIB_RELEASE=16.04
DISTRIB_CODENAME=xenial
DISTRIB_DESCRIPTION="Ubuntu 16.04.1 LTS"
$ uname -r
4.4.0-38-generic

下準備

まずは apt-get コマンドの更新サイトを最新の状態にしておく。

$ sudo apt-get update

デスクトップ環境が欲しいとき

デスクトップ環境が欲しいときは ubuntu-desktop をインストールする。

$ sudo apt-get -y install ubuntu-desktop

これには長い時間がかかるので気長に待とう。

インストールが終わったら再起動する。

$ sudo shutdown -r now

これでデスクトップ環境が入る。

X Window System だけで良いとき

デスクトップ環境までは不要で X Window System だけあれば良いときは xserver-xorg をインストールする。

$ sudo apt-get -y install xserver-xorg

これだけで良い。

統計: ピアソンのカイ二乗検定で標本が理論分布と適合しているか調べる

Python SciPy 統計

例えば、ある六面ダイス (サイコロ) に歪みがないことを調べたいとする。 もしサイコロに歪みが無いなら、出る目の理論的な度数分布はどれも  \frac{1}{6} となるはず。 しかし、サイコロの出る目は無限母集団なので、実際にすべてのパターンを試して確認することができない。 つまり、全数調査は不可能ということ。 そのため、歪みがあるか否かは実際に何度かそのサイコロを振った有限な結果から推測する必要がある。 これはつまり、標本から母集団の分布を調べる推測統計になる。

上記のようなシチュエーションでは、今回紹介するピアソンのカイ二乗検定という方法が使える。 ピアソンのカイ二乗検定で適合度を調べると、実際に振ってみた結果から母集団が理論分布となっているか否かが判断できる。 この検定はノンパラメトリックなので、特定の分布の仕方には依存しないところが便利に使える。

ピアソンのカイ二乗検定の公式 (適合度)

まず、ピアソンのカイ二乗検定では、カイ二乗値という統計量を計算することになる。 これは、次のような公式で計算する。

 \chi^{2} = \displaystyle \sum_{i = 1}^{n} \frac{(O_i - E_i)^{2}}{E_i}

上記で  O は実際に観測した値、つまり標本を表している。 また、 E は期待値、つまり母集団を表している。 変数となっている  {}_i は分類、つまりサイコロでいえば出目に相当する。

数式の意味をざっくり説明すると、期待値と観測した値の差を取ってそれを二乗している。 ここで、二乗するので得られる値は必ず正の値になることが分かる。 そして、その値をまた期待値で割っている。 これは、期待値とのズレを二乗した値と、期待値との比率を計算していることになる。 そして、その比率をすべての分類ごとに足し合わせたのがカイ二乗値となる。

で、得られた値はどうするかというと、自由度と有意水準にもとづいてカイ二乗分布表と比べる。 そして、カイ二乗分布表の値よりも小さければ母集団が理論上の分布に沿っていると判断できる。 ただ、ここは今いっぺんに説明するよりも、あとから詳しく書いた方が分かりやすいと思う。

実際に計算してみよう

それでは、実際にサイコロのカイ二乗値を計算してみることにしよう。 ただ、本物のサイコロを用意して振るのがめんどくさかったので計算機のシミュレーションで済ませてしまうことにする。 これも、モンテカルロ法という名前のついた実験方法のひとつだ。

モンテカルロ法を実施する環境としては Python を使うことにしよう。

$ python --version
Python 3.5.2

まずは Python のインタプリタを起動する。

$ python

REPL が起動したら、サイコロに相当する関数 dice() を定義する。

>>> import random
>>> def dice():
...     return random.randint(1, 6)
...

この関数を実行するとサイコロの出目が 1 から 6 の間でランダムに得られる。

>>> dice()
3
>>> dice()
5
>>> dice()
5

果たして、このサイコロは歪んでいるのか、それとも歪んでいないのか。 今回は、それをカイ二乗検定で確かめたい。

ダイスを所定の回数だけ振る

まず、カイ二乗検定を使うときは、次の条件を満たすようにすべきらしい。

 np_i \ge 10

上記の  n は標本数で  p_i は分類の理論的な出る確率を表している。

今回、理論的な確率は全ての出目で  \frac{1}{6} となることを想定している。

 p_i = \frac{1}{6}

そのため  n について解くと

 n \frac{1}{6} \ge 10

から

 n \ge 10 \times 6 = 60

となるため、標本は最低でも 60 以上ほしい。 これはつまり、カイ二乗値を計算するのに最低でも 60 回はサイコロを振る必要があるということ。

そこで、ひとまず多めに 120 回振ることにした。 Python には collections.Counter という種別ごとに登場する回数を数えるためのクラスがあるので、それを使う。

>>> from collections import Counter
>>> c = Counter([dice() for _ in range(120)])
>>> c
Counter({1: 22, 3: 22, 2: 21, 6: 21, 5: 20, 4: 14})

上記はサイコロの出目が 1 のときは 22 回あった、というように読める。 つまり 1 が出た回数が一番多くて、4 が出た回数が一番少なかったということ。

上記は出た回数が多い順にソートされていて、ちょっと見づらい。 そこで、出目ごとにソートすると、こんな感じ。

>>> sorted(c.items(), key=lambda t: t[0])
[(1, 22), (2, 21), (3, 22), (4, 14), (5, 20), (6, 21)]

カイ二乗値を計算する

標本が得られたのでカイ二乗値を計算する準備が整った。 先ほどの公式をもう一度見てみよう。

 \chi^{2} = \displaystyle \sum_{i = 1}^{n} \frac{(O_i - E_i)^{2}}{E_i}

上記で  O_i が観測値なので、これはさっきサイコロを振って得られた。

そして  O_i は期待値となっている。 これは  np_i で計算できるので、こうなる。  n が標本数で  p_i が確率だった。 確率は、すべての出目で  \frac{1}{6} となるのは前述した通り。

 E_i = np_i = 120 \times \frac{1}{6} = 20

つまり期待値として使う値はすべての出目で 20 にすれば良い。

ということで、実際に数式に値を代入してみると、こうなる。

 \chi^{2} = \displaystyle \sum_{i = 1}^{n} \frac{(O_i - E_i)^{2}}{E_i} = \frac{(22 - 20)^{2}}{20} + \frac{(21 - 20)^{2}}{20} + \frac{(22 - 20)^{2}}{20} + \frac{(14 - 20)^{2}}{20} + \frac{(20 - 20)^{2}}{20} + \frac{(21 - 20)^{2}}{20}

上記で使った値として、それぞれの出目の回数を見に戻るのが大変だろうから再掲しておく。

>>> o = [i[1] for i in sorted(c.items(), key=lambda t: t[0])]
>>> o
[22, 21, 22, 14, 20, 21]

これは、計算してみると次のようになる。

 \chi^{2} = \frac{4 + 1 + 4 + 36 + 0 + 1}{20} = \frac{46}{20} = \frac{23}{10} = 2.3

Python で計算するまでもないけど、一応確かめておこう。 すでに標本の入った変数 o はあるので、あとは期待値の入った変数 e を用意しよう。 これは、すべてに 20 が入った要素数 6 のリストになる。

>>> e = [20 for _ in range(6)]
>>> e
[20, 20, 20, 20, 20, 20]

順を追って計算していこう。 まずは標本から期待値を引いて二乗しよう。 これで公式の分母部分が計算できる。

>>> [(oi - ei) ** 2 for oi, ei in zip(o, e)]
[4, 1, 4, 36, 0, 1]

それらを足すと、こうなる。

>>> sum([(oi - ei) ** 2 for oi, ei in zip(o, e)])
46

そして、それを期待値で割る。

>>> sum([(oi - ei) ** 2 for oi, ei in zip(o, e)]) / 20
2.3

ちゃんと 2.3 になった。

標本の自由度を計算する

次に、カイ二乗分布表を参照するために標本の自由度を計算しよう。 自由度というのは、標本でいくつまで種別の値が埋まると、種別全体の値が確定するかという値になっている。

今回の例なら標本数が 120 というのは、あらかじめ分かっている。 そして種別は 6 種類あって、各種別が何回登場するかは 5 種類の値が分かれば確定する。 これはつまり 1 から 5 までの出目の回数が分かってしまえば、6 が何回出たかは自然と分かるということ。 ということで、今回使う自由度  df はサイコロの面の数である 6 から 1 を引いた値で 5 になる。

 df = k = 1 = 6 - 1 = 5

ただし、これには注意点があって、使う自由度の数は母分布によって異なる。 サイコロであれば全ての種別が同じ確率で出るので一様分布になる。 一様分布であれば自由度は種別の数から 1 を引いたもので良い。 これがピアソン分布や二項分布であれば 2 を、正規分布であれば 3 を引いて使う。

カイ二乗分布表を参照する

自由度が求まったので、それにもとづいてカイ二乗分布表を参照しよう。 通常であれば、あらかじめ求めてあるものを用いる。 例えば、次のサイトなどにあるようなもの。

χ2分布表

なぜ、ここでカイ二乗分布表というものが出てくるのか疑問に思うかもしれない。 実は、先ほど計算したカイ二乗値というものは、自由度によってどのような値が出るのかが既に分かっている。 これがカイ二乗分布表というもので、ようするにどんな値がどれくらいの確率で出るか書いてある。

表の見方としては、まず先ほど求めた自由度 (df) にもとづいて 5 の行を参照する。 ひとつの行には色々な数値が並んでいるが、これは先ほど計算したのと同じカイ二乗値になっている。 それぞれのカイ二乗値は何が異なるかというと、その値になる確率が異なっている。

上にある列を見ると有意確率が書いてある。 例えば有意確率 0.99 の列で自由度が 5 の項目を見ると 0.55 となっている。 これは、自由度 5 のカイ二乗値は 99% の確率で 0.55 以上となることを表している。

有意水準を決める

カイ二乗検定というのは、仮説検定の一種となっている。 仮説検定の考え方では、ある統計量を計算したとき、その値がどれくらいの確率で出るのかを考える。

例えば、その値が出る確率が 5% 未満や 1% 未満であれば、とても起こりづらいことは明らかだ。 そんなとき、仮説検定では「珍しいことが起こった」のではなく「本来起こるべきでないことが起きた」と考える。 例えば、サイコロでいえば仮に歪みがなかったとしても偶然偏った値になるのはありえることだろう。 しかし、そんなとき仮説検定の考え方では「偶然偏った値になった」のではなく「サイコロ自体に歪みがあった」という結論にする。

もちろん、この考え方は 5% 未満や 1% 未満など、ある水準で間違った結論を導いてしまう恐れはある。 これを統計では過誤という用語で呼ぶ。 しかし、その過誤が特定の確率で起こることは折り込んだ上で、そのように考えるらしい。 この 5% や 1% といった基準のことを有意水準と呼んで、記号では  \alpha が使われることが多い。 仮説検定では、この有意水準にもとづいて統計量を考える。

有意水準は一般的に 5% (0.05) もしくは 1% (0.01) を採用することが多い。 今回は試しに 5% (0.05) を使って考えてみることにしよう。 これは 5% の確率で過誤が起こりうる、という意味でもある。

カイ二乗値を比べる

それでは、カイ二乗分布表をもう一度見てみよう。 有意確率が 5% (0.05) で自由度が 5 の項目の値は 11.07 となっている。

カイ二乗分布表の値は、それ以上の値が出る確率がどれだけあるか、というものだった。 つまり、自由度が 5 のカイ二乗値が 11.07 以上になる確率は 5% ということだ。 ようするに計算したカイ二乗値が 11.07 以上になると、サイコロが歪んでいると判断できるわけ。

では、先ほど計算したカイ二乗値はいくつだっただろうか。 その値は 2.3 で、あきらかに 11.07 と比べると小さいことが分かる。

 \chi^{2} = 2.3 \lt 11.07

つまり、このカイ二乗値は全然ありうる値で、サイコロは歪んでいないという結論に至った。

SciPy で計算してみる

ちなみに、上記の計算は手作業が多くてなかなか面倒くさい感じだと思う。 特にカイ二乗分布表を参照することなんかは間違えやすそうだ。 そこで Python の SciPy というパッケージを使った方法も紹介しておくことにする。

SciPy はサードパーティ製のパッケージなので、まずは pip を使ってインストールしておこう。

$ pip install scipy

そして、改めて Python の REPL を起動する。

$ python

先ほどの出目と同じ状況を用意する。 変数 o が標本で変数 e が期待値となる。

>>> o = [22, 21, 22, 14, 20, 21]
>>> e = [20 for _ in range(6)]

続いてカイ二乗検定をする関数を SciPy からインポートしよう。 これには scipy.stats.chisquare() を使う。

>>> from scipy.stats import chisquare

あとは、この関数に先ほどの変数 oe を渡すだけだ。

>>> chisquare(o, e)
Power_divergenceResult(statistic=2.2999999999999998, pvalue=0.80626686988512852)

するとカイ二乗値が 2.23 で、p-値というのが 0.80 と計算されているのが分かる。

p-値というのは、それがどれくらい出やすいのかを表した値になっている。 上記であれば 0.80 なので、この値が出る確率は 80% となって全然珍しくないことが分かる。

サイコロが歪んでいるかは、有意水準よりも小さいかで判断する。 例えば有意水準が 5% であれば次のようにすれば良いというわけ。

>>> chisquare(o, e).pvalue < 0.05
False

5% の確率で間違っている恐れはあるものの、このサイコロは歪んでいないことが分かった。

コクがありそうなサイコロを使ってみる

先ほど試したのは一様乱数にもとづいたサイコロだった。 次は、あえて歪んだサイコロを振って試してみよう。

今度は、次のようにして正規乱数にもとづいたサイコロを使ってみる。 このサイコロは、小さな値ほど出やすいことになる。

>>> import random
>>> def dice():
...     return int(abs(random.normalvariate(0, 1) * 6)) % 6 + 1
...

今度は SciPy を使って一気に計算させるのでサイコロを振る数も 1200 回に増やしてみる。

>>> n = 1200
>>> c = Counter([dice() for _ in range(n)])
>>> o = [i[1] for i in sorted(c.items(), key=lambda t: t[0])]
>>> e = [n / 6 for _ in range(6)]

この結果、カイ二乗値は 69.33 が得られた。 p-値は  1.4e^{-13} というとんでもなく小さな値になっている。

>>> from scipy.stats import chisquare
>>> chisquare(o, e)
Power_divergenceResult(statistic=69.330000000000013, pvalue=1.4126539924787112e-13)

これを有意水準 5% で検定すると、このサイコロは歪んでいることが分かった。

>>> chisquare(o, e).pvalue < 0.05
True

まとめ

  • カイ二乗検定を使うと理論分布と適合しているかを調べることができる