CUBE SUGAR CONTAINER

技術系のこと書きます。

Python: k 近傍法を実装してみる

k 近傍法 (k-Nearest Neighbor algorithm) というのは、機械学習において教師あり学習分類問題を解くためのアルゴリズム。 教師あり学習における分類問題というのは、あらかじめ教師信号として特徴ベクトルと正解ラベルが与えられるものをいう。 その教師信号を元に、未知の特徴ベクトルが与えられたときに正解ラベルを予想しましょう、というもの。

k 近傍法は機械学習アルゴリズムの中でも特にシンプルな実装になっている。 じゃあ、シンプルな分だけ性能が悪いかというと、そんなことはない。 分類精度であれば、他のアルゴリズムに比べても引けを取らないと言われている。 ただし、計算量が多いという重大な欠点がある。 そのため、それを軽減するための改良アルゴリズムも数多く提案されている。

k 近傍法では、与えられた未知の特徴ベクトルを、近い場所にある教師信号の正解ラベルを使って分類する。 特徴ベクトルで近くにあるものは似たような性質を持っているはず、という考え方になっている。 今回は、そんな k 近傍法の基本的な実装を Python で書いてみることにした。

使った環境は次の通り。

$ sw_vers 
ProductName:    Mac OS X
ProductVersion: 10.12.3
BuildVersion:   16D32
$ python --version
Python 3.5.3

依存パッケージをインストールする

あらかじめ、今回のソースコードで使う依存パッケージをインストールしておく。

$ pip install numpy scipy scikit-learn

最近傍法を実装してみる

k 近傍法では、未知の特徴ベクトルの近くにある k 点の教師信号を用いる。 この k 点を 1 にしたときのことを特に最近傍法 (Nearest Neighbor algorithm) と呼ぶ。 一番近い場所にある教師信号の正解ラベルを返すだけなので、さらに実装しやすい。 そこで、まずは最近傍法から書いてみることにしよう。

次のサンプルコードでは最近傍法を NearestNeighbors というクラスで実装している。 インターフェースは scikit-learn っぽくしてみた。 分類するデータセットは Iris (あやめ) を使っている。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import numpy as np

from sklearn import datasets
from sklearn.model_selection import LeaveOneOut
from sklearn.metrics import accuracy_score


class NearestNeighbors(object):

    def __init__(self):
        self._train_data = None
        self._target_data = None

    def fit(self, train_data, target_data):
        """訓練データを学習する"""
        # あらかじめ計算しておけるものが特にないので保存だけする
        self._train_data = train_data
        self._target_data = target_data

    def predict(self, x):
        """訓練データから予測する"""
        # 判別する点と教師データとのユークリッド距離を計算する
        distances = np.array([self._distance(p, x) for p in self._train_data])
        # 最もユークリッド距離の近い要素のインデックスを得る
        nearest_index = distances.argmin()
        # 最も近い要素のラベルを返す
        return self._target_data[nearest_index]

    def _distance(self, p0, p1):
        """二点間のユークリッド距離を計算する"""
        return np.sum((p0 - p1) ** 2)


def main():
    # Iris データセットをロードする
    iris_dataset = datasets.load_iris()

    # 特徴データとラベルデータを取り出す
    features = iris_dataset.data
    targets = iris_dataset.target

    # LOO 法で汎化性能を調べる
    predicted_labels = []

    loo = LeaveOneOut()
    for train, test in loo.split(features):
        train_data = features[train]
        target_data = targets[train]

        # モデルを学習させる
        model = NearestNeighbors()
        model.fit(train_data, target_data)
        
        # 一つ抜いたテストデータを識別させる
        predicted_label = model.predict(features[test])
        predicted_labels.append(predicted_label)

    # 正解率を出力する
    score = accuracy_score(targets, predicted_labels)
    print(score)


if __name__ == '__main__':
    main()

上記のサンプルコードでは Leave-One-Out 法というやり方で交差検証をしている。

交差検証というのは、学習に使わなかったデータを使って正解を導くことができたか調べる方法を指す。 モデルの性能は、未知のデータに対する対処能力で比べる必要がある。 この、未知のデータに対する対処能力のことを汎化性能と呼ぶ。 交差検証をすることで、この汎化性能を測ることができる。

Leave-One-Out 法では、教師信号の中から検証用のデータをあらかじめ一つだけ抜き出しておく。 そして、それをモデルが正解できるのか調べるやり方だ。 抜き出す対象を一つずつずらしながら、データセットに含まれる要素の数だけ繰り返す。 他の交差検証に比べると計算量は増えるものの、厳密で分かりやすい。

上記のサンプルコードの実行結果は次の通り。

$ python nn.py 
0.96

汎化性能で 96% の正解率が得られた。

scikit-learn を使う場合

ちなみに、自分で書く代わりに scikit-learn にある実装を使う場合も紹介しておく。

次のサンプルコードは k 近傍法の実装を scikit-learn の KNeighborsClassifier に代えたもの。 インターフェースを揃えてあったので、使うクラスが違う以外は先ほどと同じソースコードになっている。 scikit-learn で最近傍法をしたいときは KNeighborsClassifier の k に 1 を指定するだけで良い。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from sklearn import datasets
from sklearn.model_selection import LeaveOneOut
from sklearn.metrics import accuracy_score
from sklearn.neighbors import KNeighborsClassifier


def main():
    iris_dataset = datasets.load_iris()

    features = iris_dataset.data
    targets = iris_dataset.target

    predicted_labels = []

    loo = LeaveOneOut()
    for train, test in loo.split(features):
        train_data = features[train]
        target_data = targets[train]

        model = KNeighborsClassifier(n_neighbors=1)
        model.fit(train_data, target_data)

        predicted_label = model.predict(features[test])
        predicted_labels.append(predicted_label)

    score = accuracy_score(targets, predicted_labels)
    print(score)


if __name__ == '__main__':
    main()

上記のサンプルコードの実行結果は次の通り。

$ python knn_scikit.py 
0.96

当然だけど同じ班化性能になっている。

k 近傍法を実装してみる

先ほど示した最近傍法の実装では、最寄りの教師信号だけを使うものとなっていた。 今度は、より汎用的に近くにある k 点の教師信号を使う実装にしてみる。

次のサンプルコードでは KNearestNeighbors クラスのコンストラクタに k を渡せるようになっている。 実装としては、分類するときに教師信号をユークリッド距離でソートした上で k 個を取り出している。 ひとまず k については 3 を指定した。 もしこれを 1 にすれば最近傍法になる。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from collections import Counter

import numpy as np

from sklearn import datasets
from sklearn.model_selection import LeaveOneOut
from sklearn.metrics import accuracy_score


class KNearestNeighbors(object):

    def __init__(self, k=1):
        self._train_data = None
        self._target_data = None
        self._k = k

    def fit(self, train_data, target_data):
        """訓練データを学習する"""
        # あらかじめ計算しておけるものが特にないので保存だけする
        self._train_data = train_data
        self._target_data = target_data

    def predict(self, x):
        """訓練データから予測する"""
        # 判別する点と教師データとのユークリッド距離を計算する
        distances = np.array([self._distance(p, x) for p in self._train_data])
        # ユークリッド距離の近い順でソートしたインデックスを得る
        nearest_indexes = distances.argsort()[:self._k]
        # 最も近い要素のラベルを返す
        nearest_labels = self._target_data[nearest_indexes]
        # 近傍のラベルで一番多いものを予測結果として返す
        c = Counter(nearest_labels)
        return c.most_common(1)[0][0]

    def _distance(self, p0, p1):
        """二点間のユークリッド距離を計算する"""
        return np.sum((p0 - p1) ** 2)


def main():
    iris_dataset = datasets.load_iris()

    features = iris_dataset.data
    targets = iris_dataset.target

    predicted_labels = []

    loo = LeaveOneOut()
    for train, test in loo.split(features):
        train_data = features[train]
        target_data = targets[train]

        model = KNearestNeighbors(k=3)
        model.fit(train_data, target_data)

        predicted_label = model.predict(features[test])
        predicted_labels.append(predicted_label)

    score = accuracy_score(targets, predicted_labels)
    print(score)


if __name__ == '__main__':
    main()

上記の実行結果は次の通り。

$ python knn.py 
0.96

汎化性能は k=1 のときと変わらないようだ。

最適な k を探す

k 近傍法では、計算に近傍何点を使うか (ようするに k) がハイパーパラメータとなっている。 ハイパーパラメータというのは、機械学習において人間が調整する必要のあるパラメータのことをいう。

次は、最適な k を探してみることにする。 といっても、やることは単に総当りで探すだけ。

せっかくならパラメータによる汎化性能の違いを可視化したい。 そこで matplotlib も入れておこう。

$ pip install matplotlib
$ mkdir -p ~/.matplotlib
$ cat << 'EOF' > ~/.matplotlib/matplotlibrc
backend: TkAgg
EOF

次のサンプルコードでは k を 1 ~ 20 の間で調整しながら総当りで汎化性能を計算している。 データセットごとに最適な k が異なるところを見ておきたいので Iris (あやめ) と Digits (数字) で調べることにした。 自分で実行するときは、データセットのロード部分にあるコメントアウトを切り替えてほしい。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from matplotlib import pyplot as plt

from sklearn import datasets
from sklearn.model_selection import LeaveOneOut
from sklearn.metrics import accuracy_score
from sklearn.neighbors import KNeighborsClassifier


def main():
    # データセットをロードする
    dataset = datasets.load_digits()
    # dataset = datasets.load_iris()

    # 特徴データとラベルデータを取り出す
    features = dataset.data
    targets = dataset.target

    # 検証する近傍数の上限
    K = 20
    ks = range(1, K + 1)

    # 使う近傍数ごとに正解率を計算する
    accuracy_scores = []
    for k in ks:
        # Leave-One-Out 法で汎化性能を測る
        predicted_labels = []
        loo = LeaveOneOut()
        for train, test in loo.split(features):
            train_data = features[train]
            target_data = targets[train]

            # モデルを学習させる    
            model = KNeighborsClassifier(n_neighbors=k)
            model.fit(train_data, target_data)
    
            # 一つだけ取り除いたテストデータを識別させる
            predicted_label = model.predict(features[test])
            predicted_labels.append(predicted_label)
    
        # 正解率を計算する
        score = accuracy_score(targets, predicted_labels)
        print('k={0}: {1}'.format(k, score))

        accuracy_scores.append(score)

    # 使う近傍数ごとの正解率を折れ線グラフで可視化する
    X = list(ks)
    plt.plot(X, accuracy_scores)

    plt.xlabel('k')
    plt.ylabel('accuracy rate')
    plt.show()
    

if __name__ == '__main__':
    main()

まずはデータセットとして Digits を使ったときから。 実行結果は次のようになる。

$ python knn_tuning.py 
k=1: 0.988313856427379
k=2: 0.986644407345576
k=3: 0.988313856427379
k=4: 0.9877573734001113
k=5: 0.9877573734001113
k=6: 0.9855314412910406
k=7: 0.9855314412910406
k=8: 0.9844184752365053
k=9: 0.9833055091819699
k=10: 0.9821925431274346
k=11: 0.9844184752365053
k=12: 0.9827490261547023
k=13: 0.9844184752365053
k=14: 0.9816360601001669
k=15: 0.9816360601001669
k=16: 0.9805230940456316
k=17: 0.9805230940456316
k=18: 0.9794101279910963
k=19: 0.9766277128547579
k=20: 0.9782971619365609

どうやら Digits のときは k を 1 か 3 にするのが良さそうだ。 f:id:momijiame:20170318133735p:plain

続いて Iris を使ったとき。

$ python knn_tuning.py 
k=1: 0.96
k=2: 0.9466666666666667
k=3: 0.96
k=4: 0.96
k=5: 0.9666666666666667
k=6: 0.96
k=7: 0.9666666666666667
k=8: 0.9666666666666667
k=9: 0.9666666666666667
k=10: 0.9733333333333334
k=11: 0.9733333333333334
k=12: 0.96
k=13: 0.9666666666666667
k=14: 0.9733333333333334
k=15: 0.9733333333333334
k=16: 0.9666666666666667
k=17: 0.9733333333333334
k=18: 0.9733333333333334
k=19: 0.98
k=20: 0.98

今度は全然違うグラフになった。 どうやら Iris なら k は 20 にするのが良いらしい。 もしかすると、さらに増やすと良い可能性もある? f:id:momijiame:20170318133823p:plain

まとめ

今回は Python を使って教師あり学習の分類問題を解くためのアルゴリズムの一つ、k 近傍法を実装してみた。 k 近傍法は単純な割に分類精度は決して低くないものの、計算量が多いという欠点がある。 k 近傍法では、計算に近傍何点を使うのが適しているかはデータセットによって異なる。 そのため、異なる k を使って汎化性能を測定して決定しよう。

ちなみに、計算量の多さを軽減するための手法としては、圧縮型 k 近傍法、分岐限定法、疑似最近傍探索などがあるようだ。 それらについては、機会があれば改めて実装してみたい。

はじめてのパターン認識

はじめてのパターン認識

Python: データセットを標準化する効果を最近傍法で確かめる

データセットの標準化については、このブログでも何回か扱っている。 しかし、実際にデータセットを標準化したときの例については試していなかった。

blog.amedama.jp

blog.amedama.jp

そこで、今回は UCI の提供する小麦 (seeds) データセットを最近傍法で分類したときに、スコアが上がる様を見てみたいと思う。 あらかじめ、いくつかの説明変数が教師信号として与えられるので、そこから小麦の品種を分類 (Classification) する。

UCI Machine Learning Repository: seeds Data Set

今回使った環境は次の通り。

$ sw_vers 
ProductName:    Mac OS X
ProductVersion: 10.12.3
BuildVersion:   16D32
$ python --version
Python 3.5.3

下準備

まずは今回使う Python のパッケージを pip でインストールする。 どれも科学計算系で定番のやつ。

$ pip install numpy pandas scipy scikit-learn

データセットを標準化しない場合

今回使う最近傍法というアルゴリズムでは、分類したい点から最も近くにあるデータの種別を使って分類する。 近さにはユークリッド距離を使うため、データセットの説明変数の大きさや単位に影響を受けやすい。 例えば説明変数の中に 1,000m ~ 10,000m を取る次元と 0.1cm ~ 1cm を取る次元があるとしよう。 当然ながら、ユークリッド距離を計算するとそのままでは前者の次元の影響が大きくなってしまう。 今回使うデータセットでも原理的には同じ問題が発生する。

次のサンプルコードでは、データセットを標準化しない状態で最近傍法を使った分類を実施している。 そして Leave-One-Out 法を使って、計測した汎化性能を出力する。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import pandas as pd
from sklearn.model_selection import LeaveOneOut
from sklearn.metrics import accuracy_score
from sklearn.neighbors import KNeighborsClassifier


def main():
    # 小麦データセットをダウンロードする
    dataset_url = 'https://archive.ics.uci.edu/ml/machine-learning-databases/00236/seeds_dataset.txt'  # noqa
    df = pd.read_csv(dataset_url, delim_whitespace=True, header=None)

    # データフレームから説明変数と目的変数を取り出す
    features = df.loc[:, :6].get_values()
    targets = df.loc[:, 7].get_values()

    # 予測した結果の入れ物
    predicted_labels = []

    # LOO で交差検証する
    loo = LeaveOneOut()
    for train, test in loo.split(features):
        train_data = features[train]
        target_data = targets[train]

        # k-NN 法を使う
        model = KNeighborsClassifier(n_neighbors=1)
        # 訓練データを学習させる
        model.fit(train_data, target_data)
        # テストデータを予測させる
        predicted_label = model.predict(features[test])
        # 予測した結果を追加する
        predicted_labels.append(predicted_label)

    # 正解率を出力する
    score = accuracy_score(targets, predicted_labels)
    print(score)


if __name__ == '__main__':
    main()

上記のサンプルコードの実行結果は次の通り。 データセットを標準化しない状態では約 90.5% の汎化性能が得られた。

$ python withoutnorm.py
0.904761904762

データセットを標準化する場合

それでは、次はデータセットを標準化する場合を試してみよう。 標準化されたデータセットでは、説明変数の各次元の値が平均 0 で標準偏差が 1 になる。 つまり、元々の単位や大きさは無くなってそれぞれの値の間隔の比率だけが残されることになる。

次のサンプルコードでは、先ほどとデータセットを標準化するところだけ変更している。 データセットの標準化には scikit-learn に用意されている StandardScaler を用いた。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import pandas as pd
from sklearn.model_selection import LeaveOneOut
from sklearn.metrics import accuracy_score
from sklearn.neighbors import KNeighborsClassifier
from sklearn.preprocessing import StandardScaler


def main():
    dataset_url = 'https://archive.ics.uci.edu/ml/machine-learning-databases/00236/seeds_dataset.txt'  # noqa
    df = pd.read_csv(dataset_url, delim_whitespace=True, header=None)

    features = df.loc[:, :6].get_values()
    targets = df.loc[:, 7].get_values()

    # データセットを Z-Score に標準化する
    sc = StandardScaler()
    sc.fit(features)
    normalized_features = sc.transform(features)

    predicted_labels = []

    loo = LeaveOneOut()
    for train, test in loo.split(normalized_features):
        train_data = normalized_features[train]
        target_data = targets[train]

        model = KNeighborsClassifier(n_neighbors=1)
        model.fit(train_data, target_data)

        predicted_label = model.predict(normalized_features[test])
        predicted_labels.append(predicted_label)

    score = accuracy_score(targets, predicted_labels)
    print(score)


if __name__ == '__main__':
    main()

上記の実行結果は次の通り。 今度は汎化性能が約 93.8% に上昇している。 データセットを標準化するだけで分類精度が 3.3% 上がった。

$ python withnorm.py
0.938095238095

まとめ

データセットを標準化することで、使う機械学習アルゴリズムによっては汎化性能を上げることができる。

Python: Pykka でアクターモデルについて学ぶ

アクターモデルというのは、並行処理のプログラミングモデルの一つだ。 並行処理という言葉からは、まずマルチスレッドとかをイメージすると思うけど、それよりも抽象度の高い概念となっている。 つまり、アクターモデルというのはマルチスレッドなどを用いて構築することになる。 どちらかといえばプロセス間通信 (IPC) の技法であって、共有メモリやロック、RPC と比較するものかもしれない。

そんなアクターモデルは、概念とか使ったときの嬉しさを理解・実感するのがなかなか難しいモデルだとも思う。 理由としては、使い始めるまでに必要なコード量が多かったり、それなりの規模のアプリケーションで使わないとメリットが分かりづらい点が挙げられる。 ただ、これはあくまで主観的なものだけど、アクターモデルをベースに組まれたアプリケーションは規模が大きくなっても並行処理をしているコードが読みやすい。 共有メモリやロックを使う場合だと、平行処理できるパートの量と可読性がトレードオフになりがちな気がするのとは対照的な感じがしている。

上記の理由としては、それぞれの手法のアプローチが次のように根本的に異なるためだと思う。

  • 共有メモリやロックを使うやり方
    • 考え方のベースはシングルスレッド
    • ボトルネックになっている末端の処理を局所的に並行にしていく
  • アクターモデル
    • 考え方のベースはマルチスレッド
    • 末端の処理はシングルスレッドで処理される
      • ただし、アクターを横に並べることで並行度を上げられる

今回は、そんなアクターモデルを Python で実装するためのフレームワークである Pykka を使ってみることにする。 ざっと調べた感じ、この Pykka が Python で汎用的なアクターモデルを実装するためのパッケージとしては最も有名そうだった。 Pykka は Scala/Java でアクターモデルを実装するためのフレームワークである Akka の影響を受けている。 使われている用語の多くも Akka から取られているらしいけど、まあ使う分には特に意識する必要はない。

ただし、Pykka は単純に Akka を Python で再実装したものではない。 次のコンポーネントは明示的にサポートしないことが宣言されている。 もし欲しければ自分で作り込まなきゃいけない。

  • スーパーバイザ
    • セルフヒーリングとかを実現するやつ
  • メッセージルータ
    • 並行処理を書きやすくしたりアプリケーションの骨子を作るためのやつ
  • ネットワーク越しの分散処理
    • 並行処理のタスクをネットワーク越しの各ホストに分配するやつ

今回使った環境は次の通り。

$ sw_vers
ProductName:    Mac OS X
ProductVersion: 10.12.3
BuildVersion:   16D32
$ python --version
Python 3.6.0

インストール

インストールは Python のパッケージマネージャである pip を使ってさくっとできる。

$ pip install pykka

もし pip が入っていないときはインストールしよう。

$ curl https://bootstrap.pypa.io/get-pip.py | sudo python

あと、システムの実行環境を汚さないように Python 仮想環境を使えるようにしておくのがおすすめ。 これには例えば virtualenv なんかを使う。

$ sudo pip install virtualenv

そこらへんはお好みで。

はじめてのアクターモデル

アクターモデルでは、その名の通りアクターというコンポーネントを組み合わせてアプリケーションを構築していく。

アクターという概念を構成する要素は、実のところたったの三つしかない。 まず、他のアクターから何らかのデータを受け取るために用意されたメッセージキューが一つ目。 そして、その受け取ったデータを渡す先となるメッセージハンドラが二つ目。 最後に、メッセージキューからデータを取り出して、それをメッセージハンドラにディスパッチする役目のスレッドが三つ目だ。 この三つの要素が、それぞれのアクターインスタンスに必ず備わっている。 メッセージキューとスレッドはデフォルトで一つずつ、メッセージハンドラについては自分で何をするか記述する。

重要な点としては、アクター同士はメモリなどを共有することがない。 することがないというよりも、する必要がない・するとアクターモデルを使う意味が薄れるという方が正しい。 アクターモデルでは、何かを共有することなくお互いのメッセージキューにメッセージを放り込み合うことで情報をやり取りして処理を進めていく。 いわゆるメッセージパッシングと呼ばれるものだ。

また、アクターの中と外の世界は非同期の壁で断絶されている。 アクターの中身を、その外側から同期的に直接触ることはできない。 触ることができてしまうと、処理の競合が起こってしまうからだ。 アクターの中を触るには、いわゆる Future を必ず介すことになる。 処理はキューイングされていつかは実行されるけど、具体的にそれがいつかは分からない状態でアクセスする。

説明が長くなったけど、そろそろ具体的なサンプルコードを見ていくことにしよう。 以下は Pykka を使って作る、最も単純なアプリケーションの一つだ。 このアプリケーションではメッセージを受け取って、それを出力するだけのアクターを定義している。 説明はコードの中にコメントとして記述している。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import pykka


class GreetingActor(pykka.ThreadingActor):
    """メッセージを受け取って挨拶をするアクター"""

    def on_receive(self, message):
        """メッセージが届いたときに実行されるハンドラ"""
        # メッセージの中から名前を取り出す
        name = message['name']
        # 内容をプリントする
        print('Hello, {0}!'.format(name))


def main():
    # アクターを起動する
    actor_ref = GreetingActor.start()

    # アクターにメッセージを送る (一方向)
    actor_ref.tell({'name': 'World'})

    # アクターを停止する (本当はちょっと待った方が良いかも)
    actor_ref.stop()


if __name__ == '__main__':
    main()

アクターは pykka.ThreadingActor というクラスを継承して作る。 on_receive() というメソッドはアクターがメッセージを受け取って、それがディスパッチされたときに発火するハンドラだ。 サンプルコードでは、そのアクターを起動してメッセージを送りつけている。 アクターは start() メソッドが呼ばれた瞬間からメッセージキューを読み出すスレッドが起動して動き出す。 あとは、アクターに tell() メソッドでメッセージを送ることができる。

また、ポイントとしてはアクターを起動したときに返ってくる内容が ActorRef というアクターへの間接的な参照になっている点だ。 つまり、起動したアクターそのものを直接触ることはできず、必ずこの参照を経由してアクセスすることになる。 直接触ることを防ぐことによって、アクターの中で競合を起こさないようにしているのだろう。

ちなみに、上記のサンプルコードではメッセージを送りつけた直後にアクターを停止しているけど、本当はちょっと待った方がお行儀が良いのかもしれない。 ちょっと待つというのは、具体的にはメインスレッドをスリープさせるということだ。 何故なら、キューからのメッセージの取り出しおよびハンドラの発火は別スレッドで進むため、すぐに実行される保証がないから。 とはいえ、他にやっていることもないからすぐに処理されるはずで、現実的には問題ないんだけど。

上記のサンプルコードを実行すると、次のようになる。

$ python hellopykka.py
Hello, World!

ただのハローワールドだけど、これはアクターモデルで実装されている!

アクターモデルのコンストラクタに引数を渡す

ここからは、アクターモデルというより Pykka の使い方の説明に入っていく。

先ほどのサンプルコードでは、アクターにメッセージの形で引数を渡した。 とはいえ、起動時の初期パラメータを設定したい場合も多いと思う。 そんなときはコンストラクタに引数を用意すれば良い。 親クラスのコンストラクタは必ず呼ぶこと。 というより、呼ばないと例外になる。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import pykka


class GreetingActor(pykka.ThreadingActor):

    def __init__(self, greeting='Hello'):
        # 親クラスのコンストラクタは必ず呼ぶ
        super().__init__()

        # 挨拶の内容をカスタマイズできるようにする
        self.greeting = greeting

    def on_receive(self, message):
        name = message['name']
        print('{0}, {1}!'.format(self.greeting, name))


def main():
    # アクターを起動するときに使ったパラメータがコンストラクタに渡される
    actor_ref = GreetingActor.start('Hajimemashite')

    actor_ref.tell({'name': 'Sekai'})

    actor_ref.stop()


if __name__ == '__main__':
    main()

コンストラクタに渡す引数はアクターの起動時に渡す。

上記のサンプルコードを実行した結果は次の通り。

$ python initpykka.py
Hajimemashite, Sekai!

コンストラクタで渡したメッセージが使われている。

返り値のあるメッセージパッシング

これまでの例では、アクターにメッセージを送っておしまいという、一方的なものだった。 アクターに対して一方的にメッセージを送りつけるには tell() メソッドを使う。 とはいえ、何かメッセージを渡して、その結果として返り値を戻してほしいというのはよくある。

そんなときはメッセージを送りつけるときに ask() メソッドを使う。 また、メッセージハンドラでは返り値を戻すように作る。 返り値を受け取る側では、いくつか受け取り方を選ぶことができる。 次のサンプルコードでは、戻り値のあるパターンを実装している。 そして、返り値の受け取り側でもいくつかのやり方を試している。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import pykka


class GreetingActor(pykka.ThreadingActor):

    def on_receive(self, message):
        name = message['name']
        # 返り値があると ask() メソッドでその値が得られるようになる
        return 'Hello, {}!'.format(name)


def main():
    actor_ref = GreetingActor.start()

    # ActorRef#ask() を使うとアクターから返り値が得られる (返事があるまでブロックする)
    answer = actor_ref.ask({'name': 'World'})
    print(answer)

    # ブロックする時間にタイムアウトを設定したいときは timeout を指定する
    answer = actor_ref.ask({'name': 'Timeout'}, timeout=1)
    print(answer)

    # 処理をブロックさせたくないときは block=False を指定すると
    # 返り値が Future オブジェクトになる
    future = actor_ref.ask({'name': 'Future'}, block=False)
    # Future オブジェクトから結果を取り出す (ここでブロックする)
    answer = future.get()
    print(answer)

    future = actor_ref.ask({'name': 'Future-Timeout'}, block=False)
    # Future から結果を取り出すときのタイムアウトも指定できる
    answer = future.get(timeout=1)
    print(answer)

    actor_ref.stop()


if __name__ == '__main__':
    main()

上記で返り値を受け取るやり方がいくつかあることについて補足しておく。 前述した通り、アクターの内外は非同期の壁で断絶されている。 つまり、メッセージに対する返事が返ってくるタイミングは、将来であることは分かるものの具体的にいつかは分からないのだ。

そのため、返事の受け取り方として「返ってくるまですぐにブロックする」か「とりあえず受理された旨の返事だけ受け取る」やり方を選べるわけだ。 後者では、いわゆる Future を受け取っている。

先ほどのサンプルコードを実行した結果は次の通り。

$ python askpykka.py
Hello, World!
Hello, Timeout!
Hello, Future!
Hello, Future-Timeout!

アクターの中で発生した例外を処理する

もし、アクターの返り値のある処理で例外が発生したことに気づくことができないと大変だ。 Pykka では ActorRef#ask() メソッド経由で呼び出された処理の中で例外が起こったときは、呼び出し元にそれが伝搬するようになっている。

次のサンプルコードではメッセージハンドラの中で意図的に例外を上げている。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import pykka


class ExceptionActor(pykka.ThreadingActor):

    def on_receive(self, message):
        raise Exception('Oops!')


def main():
    actor_ref = ExceptionActor.start()

    try:
        # ちなみに ActorRef.tell() を使うときは、投げっぱなしなので
        # 呼び出し先で例外が起こっても呼び出し元でキャッチできない
        actor_ref.ask({'name': 'World'})
    except Exception as e:
        print(e)

    actor_ref.stop()


if __name__ == '__main__':
    main()

上記のサンプルコードを実行した結果は次の通り。

$ python trypykka.py
Oops!

親スレッドが死んだときにアクターのスレッドも停止させたい

ちなみに、先ほどのサンプルコードで例外をキャッチしないでいるとどうなるか試してみただろうか。 実はアプリケーションの実行はそのまま継続することになる。 どうしてそんなことが起こるかというと、子スレッドがデーモンスレッドになっていないためだ。 アプリケーションは全てのスレッドが停止するまで処理を継続することになる。

もし、親スレッドが停止したときに子スレッドも停止させたい、つまり一家心中を図りたいときは次のようにする。 アクターには use_daemon_thread というアトリビュートが用意されているので、そのフラグを True にしよう。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import pykka


class ExceptionActor(pykka.ThreadingActor):
    # 親のスレッドが終了したら、同時に停止させたいときはこうする
    # 親が死んだとき一家心中になるスレッドをデーモンスレッドという
    use_daemon_thread = True

    def on_receive(self, message):
        raise Exception('Oops!')


def main():
    actor_ref = ExceptionActor.start()

    # デーモンスレッドになっていないと、ここで例外になって親スレッドが死ぬ
    actor_ref.ask({'name': 'World'})

    # すると、このコードが実行されないのでアクターのスレッドが停止しない
    actor_ref.stop()


if __name__ == '__main__':
    main()

こうすれば、例外を取りこぼしたことでメインスレッドが停止したときに、そこから起動されたアクターのスレッドも同時に停止する。

$ python daemonpykka.py
Traceback (most recent call last):
  File "daemonpykka.py", line 27, in <module>
    main()
  File "daemonpykka.py", line 20, in main
    actor_ref.ask({'name': 'World'})
  File "/Users/amedama/.virtualenvs/py36/lib/python3.6/site-packages/pykka/actor.py", line 435, in ask
    return future.get(timeout=timeout)
  File "/Users/amedama/.virtualenvs/py36/lib/python3.6/site-packages/pykka/threading.py", line 52, in get
    compat.reraise(*self._data['exc_info'])
  File "/Users/amedama/.virtualenvs/py36/lib/python3.6/site-packages/pykka/compat.py", line 24, in reraise
    raise value
  File "/Users/amedama/.virtualenvs/py36/lib/python3.6/site-packages/pykka/actor.py", line 201, in _actor_loop
    response = self._handle_receive(message)
  File "/Users/amedama/.virtualenvs/py36/lib/python3.6/site-packages/pykka/actor.py", line 304, in _handle_receive
    return self.on_receive(message)
  File "daemonpykka.py", line 13, in on_receive
    raise Exception('Oops!')
Exception: Oops!

アクターのフラグを切り替えて、挙動の違いを確認してみよう。

アクタープロキシを使って処理を呼び出す

これまでの例では pykka.ThreadingActor の持つ on_receive() というメソッドをオーバーライドすることでアクターを実装してきた。 しかし、このやり方はだいぶプリミティブで Pykka の低レイヤーな API を使っている。 実際には、アクタープロキシという機構を使って、アクターのメソッドをあたかも直接呼び出しているような風に処理できる。

次のサンプルコードではアクターで独自のメソッド greeting を定義している。 その呼び出し側では ActorRef から ActorProxy を生成して、あたかもメソッドを直接呼び出しているように扱っている。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import pykka


class GreetingActor(pykka.ThreadingActor):

    def greeting(self):
        print('Hello, World!')


def main():
    actor_ref = GreetingActor.start()

    # ActorProxy を取得する
    actor_proxy = actor_ref.proxy()

    # ActorProxy では定義されているメソッドを直接呼べる (ように見える)
    actor_proxy.greeting()

    actor_ref.stop()


if __name__ == '__main__':
    main()

上記のサンプルコードを実行した結果は次の通り。

$ python proxypykka.py
Hello, World!

アクタープロキシを使って返り値のある場合

先ほどのサンプルコードでは、アクタープロキシを使って返り値のない場合を扱っていた。 今度は返り値のあるときを試してみる。 アクタープロキシを使って返り値を受け取るときは、実は必ず Future を介すことになる。

次のサンプルコードではアクタープロキシを使った上で返り値を受け取っている。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import pykka


class GreetingActor(pykka.ThreadingActor):

    def greeting(self):
        return 'Hello, World!'


def main():
    actor_ref = GreetingActor.start()
    actor_proxy = actor_ref.proxy()

    # 返り値のある呼び出しでは Future オブジェクトが返る
    future = actor_proxy.greeting()
    answer = future.get()
    print(answer)

    actor_ref.stop()


if __name__ == '__main__':
    main()

アクタープロキシ経由で実行したメソッドは、必ず Future 経由で返り値を受け取ることになる。

上記の実行結果は次の通り。

$ python futurepykka.py
Hello, World!

アクターのアトリビュートにアクタープロキシ経由でアクセスする

前述した通り、アクターの中と外は非同期の壁で断絶されていることから、直接同期的に触ることはできない。 それをすると、アクターの処理が競合する恐れがあるためだ。

では、アクターのアトリビュートを触るのにも、必ずアクセサに相当するメソッドを定義する必要があるのだろうか? だとすると相当に面倒くさいと思うんだけど、これは定義する必要がない。 アクタープロキシ経由であればアトリビュートも Future 経由にはなるが、触ることができる。

次のサンプルコードではアクターに message というアトリビュートを定義している。 メインスレッドからは、それをアクタープロキシ経由で読み出している。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import pykka


class MyActor(pykka.ThreadingActor):

    def __init__(self):
        super().__init__()
        # アクターでないオブジェクトをアトリビュートとして持つ
        self.message = 'Hello, World!'


def main():
    actor_ref = MyActor.start()
    actor_proxy = actor_ref.proxy()

    # アクターでないアトリビュートにアクセスすると Future が返る
    future = actor_proxy.message
    # 値を取り出す
    answer = future.get()
    print(answer)

    actor_ref.stop()


if __name__ == '__main__':
    main()

ポイントとしては、アトリビュートへのアクセスであっても Future が返る点だ。 こうすればアクセスが非同期に処理されるため、処理が競合することはない。

上記の処理結果は次の通り。

$ python attrpykka.py
Hello, World!

アクターのメンバが持つメソッドをアクタープロキシ経由で呼び出す

先ほどのサンプルコードではアクターのメンバがただの文字列だった。 次は、これがメソッドを持ったインスタンスだったときの話。

次のサンプルコードでは、アクター MyActor がアクターでないアトリビュート NonActor のインスタンスをメンバに持っている。 それを、アクタープロキシ経由で外から呼び出している。 この場合、メンバとなるアクターでないオブジェクトには pykka_traversable というフラグが必要になる。 このフラグを立てることで、アクタープロキシから呼び出すことができるようになる。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import pykka


class NonActor(object):
    """アクターのアトリビュートになる非アクタークラス"""
    # pykka_traversable アトリビュートを True にするとプロキシ経由でアクセスできるようになる
    pykka_traversable = True

    def greeting(self):
        """プロキシ経由で外部から呼びたいメソッド"""
        return 'Hello, World!'


class MyActor(pykka.ThreadingActor):

    def __init__(self):
        super().__init__()
        # アクターでないオブジェクトをアトリビュートとして持つ
        self.non_actor = NonActor()


def main():
    actor_ref = MyActor.start()
    actor_proxy = actor_ref.proxy()

    # アクターのアトリビュートのオブジェクトが持つメソッドを呼び出せるようになる
    future = actor_proxy.non_actor.greeting()
    # Future が返ってくるので値を取り出す
    answer = future.get()
    print(answer)

    actor_ref.stop()


if __name__ == '__main__':
    main()

このパターンでも返り値は Future になる。

上記のサンプルコードの実行結果は次の通り。

$ python methodpykka.py
Hello, World!

アクターモデルを使った並行処理

これまでのサンプルコードでは、アクターは定義しているものの平行処理をしていなかった。 なので、説明の都合上、仕方なかったとはいえ一体何のためにこんなことを、という印象だったかもしれない。 ここからは、ようやくアクターモデルを使って並行処理をしてみる。

アクターモデルの並行処理では、基本的にアクターをたくさん用意してタスクをそれぞれのアクターに振り分けていく。 アクターには、それぞれメッセージを処理するスレッドが起動しているので、処理をバラバラに並列で進めることができるわけ。

次のサンプルコードでは、 アクター MyActor に擬似的に処理に時間のかかるメソッド process() を用意している。 時間がかかる、というところはスレッドをスリープすることで表現した。 ここでは、時間のかかる処理を 4 回呼び出している。 それぞれの処理には 1 秒かかる。 一つのアクターで直列に処理すれば、単純に 4 秒かかるはずの処理を、ここでは二つのアクターに振り分けている。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from __future__ import division

import time

import pykka


class MyActor(pykka.ThreadingActor):

    def process(self, processing_id, sleep_sec):
        """時間がかかる処理を模したメソッド"""
        # time.sleep() を使って時間がかかる処理をエミュレートする
        time.sleep(sleep_sec)
        # 処理が終わったら内容を出力する
        print('Completed: ID {0} in {1} s'.format(processing_id, sleep_sec))


def main():
    # アクターを二つ起動する
    actor_proxy_a = MyActor.start().proxy()
    actor_proxy_b = MyActor.start().proxy()

    # 処理を開始した時刻を記録しておく
    start_time = time.time()

    # それぞれのアクターに仕事を二つずつ割り当てる
    # 処理はアクターのスレッドがキューから取り出して開始する
    future1 = actor_proxy_a.process(1, 1)
    future2 = actor_proxy_a.process(2, 1)

    future3 = actor_proxy_b.process(3, 1)
    future4 = actor_proxy_b.process(4, 1)

    # 計算が終わるのを待つ (threading#join() のようなものと考えると分かりやすい)
    future1.get()
    future2.get()
    future3.get()
    future4.get()

    # 処理が終わったら時刻を記録する
    end_time = time.time()

    # 一連の処理にかかった時間を出力する
    elapsed_time = end_time - start_time
    print('Elapsed Time: {0} s'.format(elapsed_time))

    # すべてのアクターを停止させる
    pykka.ActorRegistry.stop_all()


if __name__ == '__main__':
    main()

さて、処理が終わるのに実際は何秒かかるだろうか?

上記のサンプルコードの実行結果は次の通り。 二つのアクターで処理した結果として本来一つのアクターなら 4 秒かかるところ 2 秒で完了している。

$ python cuncurrentpykka.py
Completed: ID 3 in 1 s
Completed: ID 1 in 1 s
Completed: ID 2 in 1 s
Completed: ID 4 in 1 s
Elapsed Time: 2.0071158409118652 s

アクターモデルでも設計によっては競合が起こる

アクターモデルを使えば、競合が起きないというような説明をされることがたまにあるようだ。 しかし、その「競合が起きない」というのは、より低レイヤーな側面からアクターモデルを見たときの話だと思う。 あくまで、アクターのメッセージキューを処理するスレッドが一つだけなので、アクターの中で処理が競合しないというのに過ぎない。 より高レイヤーに、アプリケーションという側面からアクターモデルを見たとき、競合は容易に起こる。 それぞれの競合は全く別の次元の話なんだけど、もし混ぜて考えてしまうと危険だ。 次は、その競合について見てみよう。

サンプルコードでは、銀行口座を模したモデルを用意した。 AccountActor という銀行口座を模したアクターを UserActor というユーザを模したアクターが操作する。 UserActor はまず銀行口座から預金残高を読み取って、その値を元に預金残高を更新する。 ここでポイントなのは AccountActor の API がアトミックになっていないという点だ。 つまり、銀行口座の読み取りから更新までの間に別のアクターからの処理が割り込む余地がある。 サンプルコードでは、口座の読み取りから更新までの間に意図的にスリープを入れることで競合が起こりやすいようにしている。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import time

import pykka


class AccountActor(pykka.ThreadingActor):
    """銀行口座を模したアクター"""

    def __init__(self, initial_money=0):
        super(AccountActor, self).__init__()

        self._money = initial_money

    def get_balance(self):
        # 口座の残高を得る
        return self._money

    def set_balance(self, money):
        # 口座の残高を設定する
        self._money = money


class UserActor(pykka.ThreadingActor):
    """銀行口座を使うユーザを模したアクター"""

    def __init__(self, username, account_actor):
        super(UserActor, self).__init__()

        self._username = username
        self._account_actor = account_actor

    def save(self, amount):
        # 残高を読み出す
        future = self._account_actor.get_balance()
        balance = future.get()
        print('{0} read balance: {1}'.format(self._username, balance))
        # 残高を読んでから次の処理に移るのに時間がかかった!
        time.sleep(1)
        print('{0} sleep...'.format(self._username))
        # 口座残高を更新する
        new_balance = balance + amount
        future = self._account_actor.set_balance(new_balance)
        future.get()
        print('{0} set balance: {1}'.format(self._username, new_balance))


def main():
    # アクターを起動する
    account_actor_ref = AccountActor.start(100)
    account_actor_proxy = account_actor_ref.proxy()

    user_a_ref = UserActor.start('UserA', account_actor_proxy)
    user_a_proxy = user_a_ref.proxy()

    user_b_ref = UserActor.start('UserB', account_actor_proxy)
    user_b_proxy = user_b_ref.proxy()

    # ユーザ A と B が同時に口座に 50 を入金する
    future_a = user_a_proxy.save(50)
    future_b = user_b_proxy.save(50)

    future_a.get()
    future_b.get()

    # 最終的な口座の残高を出力する
    print('Final Balance: {0}'.format(account_actor_proxy.get_balance().get()))

    pykka.ActorRegistry.stop_all()


if __name__ == '__main__':
    main()

二つのユーザを模したアクターは銀行残高をそれぞれ 50 だけ増やすべく処理を実施する。 初期の預金残高は 100 なので、最終的には 200 になっていないとおかしい。

では、上記のサンプルコードを実行してみよう。

$ python conflictpykka.py
UserA read balance: 100
UserB read balance: 100
UserA sleep...
UserB sleep...
UserA set balance: 150
UserB set balance: 150
Final Balance: 150

最終的な口座残高は 150 となってしまった。 見事に競合が発生している。

問題は、アクターの API が更新に対してアトミックになっていなかった点だ。 次の改良したサンプルコードでは、預金残高を更新するメソッドを update_balance() という形でアトミックにしている。 現在の預金残高を元に、口座のアクター自体が更新をかけるので、間に別のアクターの処理が入り込む余地はない。 アクターの中はメッセージキューを処理する一つのスレッドで処理されているので、同時に同じメソッドが走る心配はない。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import pykka


class AccountActor(pykka.ThreadingActor):

    def __init__(self, initial_money=0):
        super(AccountActor, self).__init__()

        self._money = initial_money

    def get_balance(self):
        return self._money

    def update_balance(self, money):
        # アトミックに口座残高を更新する
        self._money = self._money + money


class UserActor(pykka.ThreadingActor):

    def __init__(self, username, account_actor):
        super(UserActor, self).__init__()

        self._username = username
        self._account_actor = account_actor

    def save(self, amount):
        # 口座残高を更新する
        future = self._account_actor.update_balance(amount)
        future.get()
        print('{0} update balance: {1}'.format(self._username, amount))


def main():
    # アクターを起動する
    account_actor_ref = AccountActor.start(100)
    account_actor_proxy = account_actor_ref.proxy()

    user_a_ref = UserActor.start('UserA', account_actor_proxy)
    user_a_proxy = user_a_ref.proxy()

    user_b_ref = UserActor.start('UserB', account_actor_proxy)
    user_b_proxy = user_b_ref.proxy()

    # ユーザ A と B が同時に口座に 50 を入金する
    future_a = user_a_proxy.save(50)
    future_b = user_b_proxy.save(50)

    future_a.get()
    future_b.get()

    # 最終的な口座の残高を出力する
    print('Final Balance: {0}'.format(account_actor_proxy.get_balance().get()))

    pykka.ActorRegistry.stop_all()


if __name__ == '__main__':
    main()

上記のサンプルコードを実行した結果は次の通り。 今度はちゃんと預金残高が 200 に更新されている。

$ python conflictpykka2.py
UserA update balance: 50
UserB update balance: 50
Final Balance: 200

アクターモデルでも設計によってはデッドロックが起こる

典型的なデッドロックは、マルチスレッドでリソースをロックする順番が不定になっていたときに起こる。 アクターモデルはロックを使わないので、それならデッドロックが起こらないかというと、そんなことはない。

次のサンプルコードではデッドロックを発生させている。 ActorAActorB のメソッドを呼んで、ActorBActorA のメソッドを呼ぶ、というように操作が交差している。 それぞれのメソッドは各アクターのスレッドで非同期に処理されるので、返り値で得られる Future を get() すると完了するまでブロックする。 すなわち、双方がお互いの処理の完了を待ってデッドロックを起こしてしまう。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import logging
import signal
import os

import pykka
from pykka import debug


class ActorA(pykka.ThreadingActor):

    def method_a(self, actor_b_proxy):
        # 2. ActorB の method_b を呼ぶ
        future = actor_b_proxy.method_b()
        # 3. Future の中身を取り出そうとすると ActorA のスレッドは取り出せるまでブロックする
        future.get()


class ActorB(pykka.ThreadingActor):

    def __init__(self, actor_a_proxy):
        super(ActorB, self).__init__()
        self.actor_a_proxy = actor_a_proxy

    def method_b(self):
        # 4. ActorA の method_a を呼ぶ (本当は自身のプロキシを渡す必要があるけど)
        future = self.actor_a_proxy.method_a()
        # 5. Future の中身を取り出そうとすると ActorB のスレッドは取り出せるまでブロックする
        future.get()
        # XXX: ActorA のスレッドは既にブロックしているのでデッドロックを起こす


def main():
    """意図的にデッドロックを起こしてみる"""

    # デバッグレベルのログが出力されるようにする
    logging.basicConfig(level=logging.DEBUG)

    # シグナル SIGUSR1 でスレッドのトレースバックを出力する
    signal.signal(signal.SIGUSR1, debug.log_thread_tracebacks)

    # アクターを起動する
    actor_a_ref = ActorA.start()
    actor_a_proxy = actor_a_ref.proxy()
    # ActorB には ActorA のプロキシを渡しておく
    actor_b_ref = ActorB.start(actor_a_proxy)
    actor_b_proxy = actor_b_ref.proxy()

    # 1. ActorA の method_a を呼ぶ (引数として ActorB を渡す)
    actor_a_proxy.method_a(actor_b_proxy)

    # メインスレッドでは PID を出力する
    pid = os.getpid()
    print(f'PID: {pid}')

    # アクターがデーモンスレッドではないので、ここまできてもプログラムは終了しない


if __name__ == '__main__':
    main()

上記を実行すると、何も表示されずに処理が止まる。

$ python deadlockpykka.py
DEBUG:pykka:Registered ActorA (urn:uuid:5b470837-65b9-408d-9843-23982f2efcc5)
DEBUG:pykka:Starting ActorA (urn:uuid:5b470837-65b9-408d-9843-23982f2efcc5)
DEBUG:pykka:Registered ActorB (urn:uuid:02ab87fd-705b-4ec4-9718-78870b99eb90)
DEBUG:pykka:Starting ActorB (urn:uuid:02ab87fd-705b-4ec4-9718-78870b99eb90)
PID: 9199

サンプルコードでプロセス ID を表示するようにしているところに気づいただろうか? また Pykka の debug.log_thread_tracebacks という関数がシグナル SIGUSR1 に対して登録されている。 そこで、別のターミナルからこのハンドラに対してシグナルを送ってみよう。

$ kill -SIGUSR1 9199

すると、今のスレッドが何処を処理しているのかトレースバックが表示される。 この表示からは、二つのアクターのスレッドが future.get() でブロックしていることが分かる。

$ python deadlockpykka.py
DEBUG:pykka:Registered ActorA (urn:uuid:5b470837-65b9-408d-9843-23982f2efcc5)
DEBUG:pykka:Starting ActorA (urn:uuid:5b470837-65b9-408d-9843-23982f2efcc5)
DEBUG:pykka:Registered ActorB (urn:uuid:02ab87fd-705b-4ec4-9718-78870b99eb90)
DEBUG:pykka:Starting ActorB (urn:uuid:02ab87fd-705b-4ec4-9718-78870b99eb90)
PID: 9199
CRITICAL:pykka:Current state of ActorB-2 (ident: 123145451573248):
  File "/Users/amedama/.pyenv/versions/3.6.0/lib/python3.6/threading.py", line 884, in _bootstrap
    self._bootstrap_inner()
  File "/Users/amedama/.pyenv/versions/3.6.0/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/Users/amedama/.pyenv/versions/3.6.0/lib/python3.6/threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/amedama/.virtualenvs/py36/lib/python3.6/site-packages/pykka/actor.py", line 201, in _actor_loop
    response = self._handle_receive(message)
  File "/Users/amedama/.virtualenvs/py36/lib/python3.6/site-packages/pykka/actor.py", line 295, in _handle_receive
    return callee(*message['args'], **message['kwargs'])
  File "deadlockpykka.py", line 31, in method_b
    future.get()
  File "/Users/amedama/.virtualenvs/py36/lib/python3.6/site-packages/pykka/threading.py", line 50, in get
    self._data = self._queue.get(True, timeout)
  File "/Users/amedama/.pyenv/versions/3.6.0/lib/python3.6/queue.py", line 164, in get
    self.not_empty.wait()
  File "/Users/amedama/.pyenv/versions/3.6.0/lib/python3.6/threading.py", line 295, in wait
    waiter.acquire()

CRITICAL:pykka:Current state of ActorA-1 (ident: 123145446318080):
  File "/Users/amedama/.pyenv/versions/3.6.0/lib/python3.6/threading.py", line 884, in _bootstrap
    self._bootstrap_inner()
  File "/Users/amedama/.pyenv/versions/3.6.0/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/Users/amedama/.pyenv/versions/3.6.0/lib/python3.6/threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/amedama/.virtualenvs/py36/lib/python3.6/site-packages/pykka/actor.py", line 201, in _actor_loop
    response = self._handle_receive(message)
  File "/Users/amedama/.virtualenvs/py36/lib/python3.6/site-packages/pykka/actor.py", line 295, in _handle_receive
    return callee(*message['args'], **message['kwargs'])
  File "deadlockpykka.py", line 18, in method_a
    future.get()
  File "/Users/amedama/.virtualenvs/py36/lib/python3.6/site-packages/pykka/threading.py", line 50, in get
    self._data = self._queue.get(True, timeout)
  File "/Users/amedama/.pyenv/versions/3.6.0/lib/python3.6/queue.py", line 164, in get
    self.not_empty.wait()
  File "/Users/amedama/.pyenv/versions/3.6.0/lib/python3.6/threading.py", line 295, in wait
    waiter.acquire()

CRITICAL:pykka:Current state of MainThread (ident: 140736596534208):
  File "/Users/amedama/.pyenv/versions/3.6.0/lib/python3.6/threading.py", line 1290, in _shutdown
    t.join()
  File "/Users/amedama/.pyenv/versions/3.6.0/lib/python3.6/threading.py", line 1056, in join
    self._wait_for_tstate_lock()
  File "/Users/amedama/.pyenv/versions/3.6.0/lib/python3.6/threading.py", line 1072, in _wait_for_tstate_lock
    elif lock.acquire(block, timeout):
  File "/Users/amedama/.virtualenvs/py36/lib/python3.6/site-packages/pykka/debug.py", line 68, in log_thread_tracebacks
    stack = ''.join(traceback.format_stack(frame))

Pykka の並行処理に関しては、上記のようにしてデバッグができる。

スレッドプールっぽいものを作って並行処理をしてみる

次はもうちょっと実践的に、スレッドプールっぽく並行処理をしてみよう。 とはいえ、あまり汎用的には作らずベタ書きだけど。

次のサンプルコードでは、先ほどの例と同じように MyActor に処理に時間のかかるメソッド process() を用意している。 アクターを配列で 10 個生成した上で、タスクを 20 回呼び出してみよう。 ただし、通常のスレッドプールのような形で終わったものから新しいタスクを割り当てる、というものではない。 あらかじめ、各アクターに決まった番号のタスクを割り当てるような形になっている。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from __future__ import division

import time
import random

import pykka


class MyActor(pykka.ThreadingActor):

    def process(self, processing_id):
        """時間がかかる処理を模したメソッド"""
        # 処理時間をランダムに用意する
        sleep_sec = random.randint(1, 10) / 10
        # time.sleep() を使って時間がかかる処理をエミュレートする
        time.sleep(sleep_sec)
        # 処理が終わったら内容を出力する
        print('Completed: ID {0} in {1} s'.format(processing_id, sleep_sec))
        # 処理の ID を返す
        return processing_id


def main():
    # スレッドプールのようにアクターを用意する
    actor_proxy_pool = [MyActor.start().proxy() for _ in range(10)]

    # それぞれのアクターに仕事を割り当てていく
    # かかる時間はランダムにする
    futures = [
        actor_proxy_pool[i % len(actor_proxy_pool)].process(i)
        for i in range(20)
    ]

    # すべての処理が完了するのを待ち合わせる
    answers = pykka.get_all(futures)

    print('*** Calculation Completed ***')

    # 完了した結果を出力する
    for answer in answers:
        print('ID: {0}'.format(answer))

    # すべてのアクターを停止させる
    pykka.ActorRegistry.stop_all()


if __name__ == '__main__':
    main()

実行結果は次の通り。 各タスクが順番バラバラで実行されているのが分かる。

$ python tpoolpykka.py
Completed: ID 3 in 0.1 s
Completed: ID 0 in 0.3 s
Completed: ID 7 in 0.3 s
Completed: ID 13 in 0.2 s
Completed: ID 1 in 0.5 s
Completed: ID 4 in 0.5 s
Completed: ID 5 in 0.6 s
Completed: ID 2 in 0.7 s
Completed: ID 8 in 0.7 s
Completed: ID 10 in 0.4 s
Completed: ID 17 in 0.4 s
Completed: ID 6 in 0.8 s
Completed: ID 9 in 1.0 s
Completed: ID 15 in 0.5 s
Completed: ID 11 in 0.6 s
Completed: ID 12 in 0.5 s
Completed: ID 16 in 0.5 s
Completed: ID 14 in 0.9 s
Completed: ID 19 in 0.5 s
Completed: ID 18 in 0.9 s
*** Calculation Completed ***
ID: 0
ID: 1
ID: 2
ID: 3
ID: 4
ID: 5
ID: 6
ID: 7
ID: 8
ID: 9
ID: 10
ID: 11
ID: 12
ID: 13
ID: 14
ID: 15
ID: 16
ID: 17
ID: 18
ID: 19

出力の最後を見て分かる通り、実行結果として得られる配列の順番は変わっていない。 これは、単純に内部でやっているのが順番に Future#get() をしていく、という処理だからだ。 とはいえ、最終的に得られる結果の順番が入れ替わらないという特性は並行処理をする上で優れた特徴だと思う。 場合によっては終わったものから返ってきて、後からソートし直さないといけないことなんかもあるし。

ラウンドロビンで処理するメッセージルータを実装してみる

次は、お試しで作ってみたメッセージルータについて。 これはほんとに試しに作ってみただけなのでプルーフオブコンセプトと思ってほしい。

次のサンプルコードでは MyActor をラップするメッセージルータの RoundRobinRouter を定義している。 RoundRobinRouter では、内部的に MyActor のプロキシを複数生成して、処理を振り分ける。 先ほどのべた書きのスレッドプールを、もうちょっと洗練させた感じ。 RoundRobinRouter は、まず処理をフォワードしたいアクターと同時並行数 (アクターをいくつ生成するか) を指定して起動する。 その上で RoundRobinRouter#forward() メソッドに呼び出したいメソッド名と引数を指定するだけ。 注意点としては、返り値が二重の Future になっているところ。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from __future__ import division

import time
import random

import pykka


class MyActor(pykka.ThreadingActor):

    def process(self, processing_id):
        """時間がかかる処理を模したメソッド"""
        # 処理時間をランダムに用意する
        sleep_sec = random.randint(1, 10) / 10
        # time.sleep() を使って時間がかかる処理をエミュレートする
        time.sleep(sleep_sec)
        # 処理が終わったら内容を出力する
        print('Completed: ID {0} in {1}s by {2}'.format(
            processing_id,
            sleep_sec,
            self.actor_urn,
        ))
        # 処理の ID を返す
        return processing_id


class RoundRobinRouter(pykka.ThreadingActor):
    """ラップしたアクターにラウンドロビンで処理を割り当てていくメッセージルータ"""

    def __init__(self, actor_type, size):
        super(RoundRobinRouter, self).__init__()

        # アクターのプロキシを生成しておく
        self._proxy_pool = [actor_type.start().proxy()
                            for _ in range(size)]
        # 次に処理させるアクターを決めるジェネレータ
        self._index_gen = self._next_actor_index()

    def forward(self, name, *args, **kwargs):
        # 次の処理させるアクターのインデックス
        actor_index = next(self._index_gen)
        # 次の処理させるアクターのプロキシ
        actor_proxy = self._proxy_pool[actor_index]
        # フォワード対象のアトリビュートをプロキシから取り出す
        attribute = getattr(actor_proxy, name)
        # 引数を渡して呼び出す
        return attribute(*args, **kwargs)

    def _next_actor_index(self):
        """次の処理対象のアクターのインデックスを生成するジェネレータ"""
        i = 0
        while True:
            yield i
            # 0, 1, 2, 3 ... 0, 1, 2, 3 と繰り返す
            i = (i + 1) % len(self._proxy_pool)


def main():
    # 五つの MyActor をラップしたルータを生成する
    router_ref = RoundRobinRouter.start(MyActor, 5)
    router_proxy = router_ref.proxy()

    futures = [router_proxy.forward('process', i)
               for i in range(10)]

    # このルータを経由させると Future の二段重ねになる
    inner_futures = pykka.get_all(futures)
    answers = pykka.get_all(inner_futures)

    print('*** Calculation Completed ***')

    for answer in answers:
        print('ID: {0}'.format(answer))

    # すべてのアクターを停止させる
    pykka.ActorRegistry.stop_all()


if __name__ == '__main__':
    main()

上記の実行結果は次の通り。 タスクを処理したアクターの識別子も同時に出力するようになっている。 見ると、タスクの識別子が 5 つの周期で同じアクターによって処理されていることが分かる。 上手く動作しているようだ。

$ python routerpykka.py
Completed: ID 1 in 0.3s by urn:uuid:a5d3f2e6-6d59-4351-9be8-e47023428e99
Completed: ID 2 in 0.6s by urn:uuid:ee6f93cf-66df-4500-b416-84defff8b195
Completed: ID 3 in 0.6s by urn:uuid:c6003463-87fc-4988-b8bb-34186e867630
Completed: ID 0 in 0.7s by urn:uuid:9a2fe8bc-49fe-4b19-96d5-6658ecae3677
Completed: ID 7 in 0.1s by urn:uuid:ee6f93cf-66df-4500-b416-84defff8b195
Completed: ID 4 in 0.8s by urn:uuid:0e3b5668-6825-43f5-bd95-9cf2c706a03d
Completed: ID 6 in 0.5s by urn:uuid:a5d3f2e6-6d59-4351-9be8-e47023428e99
Completed: ID 9 in 0.3s by urn:uuid:0e3b5668-6825-43f5-bd95-9cf2c706a03d
Completed: ID 5 in 0.8s by urn:uuid:9a2fe8bc-49fe-4b19-96d5-6658ecae3677
Completed: ID 8 in 1.0s by urn:uuid:c6003463-87fc-4988-b8bb-34186e867630
*** Calculation Completed ***
ID: 0
ID: 1
ID: 2
ID: 3
ID: 4
ID: 5
ID: 6
ID: 7
ID: 8
ID: 9

まとめ

今回は Python でアクターモデルを実装するためのフレームワークである Pykka を試してみた。

アクターモデルというのは、並行処理のプログラミングモデルの一つ。 ただし、マルチスレッドやマルチプロセスよりも、抽象度の高い概念になっている。 対比するとしたら共有メモリやロック、RPC などがそれに当たると思う。

アクターモデルの概念を理解する上で重要なのは、アクターを構成する三つの要素を理解すること。 一つ目がメッセージキュー、二つ目がメッセージハンドラ、三つ目がそれらをディスパッチするスレッド。 生成されたアクターには、それぞれにこの三つの要素が必ず備わっている。 メッセージキューとスレッドはあらかじめアクターごとに一つずつ生成されるし、メッセージハンドラは自分で定義する。

アクターモデルでは、内部で競合を起こさないようにアクターの内外が非同期の壁で分断されている。 アクターの内部に、外側から同期的に直接触れることはできない。 必ず、触るとしたら Future を経由することになる。 同期的に触っているように見えたとしても、内部的には必ず経由している。

アクターモデルで並行処理を実現するには、アクターを横にたくさん並べてタスクを割り当てていく。 ただし、それぞれのタスクはアトミックに作られていないといけない。 アクターモデルで競合が起こらないというのは、あくまで「低レイヤーで内部的には」という但し書きがつく。 また、ロックがないからといって、デッドロックが起こらないというわけでもない。

Pykka の場合、Akka の機能を全て揃えているわけではないので、割りとよく使いそうな機能でも自分で書く必要がある。 また、正直アプリケーションで直接 Pykka を使うというシチュエーションは、あまり無いのかもしれないと思った。 なぜなら Pykka の提供する API は、直接使うにはあまりに低レイヤーすぎるからだ。 なので、Pykka をベースにした何らかのフレームワークをまず書いて、その上でアプリケーションを組むことになりそうな印象を受けた。

いじょう。

Python: Keras/TensorFlow の学習を GPU で高速化する (Ubuntu 16.04 LTS)

以前、このブログで Keras/TensorFlow の学習スピードを GPU を使って速くする記事を書いた。 ただし、このとき使った OS は Mac OS X (macOS Sierra) だった。

blog.amedama.jp

とはいえ NVIDIA の dGPU を積んだ Mac がどれだけあるんだというと、正直なかなか無いと思う。 実際にやってみるとしたら Linux だよねということで、今回は Ubuntu 16.04 LTS を使う場合について書く。

インストールの手順については次の公式ドキュメントをベースに進める。

Installing TensorFlow on Ubuntu  |  TensorFlow

環境について

今回使った OS のバージョンなどは次の通り。

$ cat /etc/lsb-release
DISTRIB_ID=Ubuntu
DISTRIB_RELEASE=16.04
DISTRIB_CODENAME=xenial
DISTRIB_DESCRIPTION="Ubuntu 16.04.2 LTS"
$ uname -r
4.4.0-66-generic

また、学習に GPU を使う場合はハードウェアの環境にも制約があるので、ここで記述しておく。 まず、当たり前だけど大前提として CUDA をサポートしている NVIDIA のグラフィックカードが必ず必要になる。 ちなみに TensorFlow のドキュメントを見ると一部 Radeon も対応しているらしいことが書かれていたけど、試してはいない。

さらに、NVIDIA のグラフィックカードなら何でも良いのかというと、そういうわけでもない。 次の NVIDIA 公式ページを見てほしい。 ここには NVIDIA の各 GPU ごとの Compute Capability という数値が載っている。

CUDA GPUs | NVIDIA Developer

Compute Capability という字面だけを見ると、性能を表す数値のような印象を受けるけど、そうではない。 数値はパフォーマンスとは関係がなく、あくまで GPU の世代というか載っている機能のバージョンらしい。 問題は、この数値が少なくとも 3.0 以上ないと現行の CUDA 8.0 がサポートしていない、という点だ。

また、PyPI で配布されている TensorFlow の GPU 版バイナリパッケージは CUDA 8.0 でビルドされている。 なので、もちろん動かす環境にも CUDA 8.0 が必要になる。 以上のことから Compute Capability が 3.0 以上の GPU を用意すべきだ。

ちなみに、ドキュメントを見ると一応 TensorFlow は今のところ CUDA 7.5 もサポートしているらしい。 CUDA 7.5 であれば Compute Capability は 2.0 以上をサポートしている。 ただし、そのときは自分でソースコードからビルドしなければいけない。 それについて、この記事では扱わない。

今回 GPU には GeForce GTX 1050 Ti を用意した。 これの Compute Capability は 6.1 となっている。 具体的なグラフィックカードは次のもの。

MSI GeForce GTX 1050 Ti 4G OC グラフィックスボード VD6191

MSI GeForce GTX 1050 Ti 4G OC グラフィックスボード VD6191

GPU のチョイスについては、現時点 (2017/3) で最もコストパフォーマンスが高くなるものを選んだ。 PassMark というベンチマークのスコアを値段で割った値を考えると GeForce GTX 1050 Ti が一番高くなる。 あとは、CPU に比べると GPU の学習速度はそれこそ一桁か、下手すると二桁は違ってくる。 つまり、GPU が載っていること自体がまずは重要で、性能はその次かなという感じ。 ようするに 60 秒かかった処理が 6 秒になるのは大きく違うけど、6 秒が 3 秒になってどれだけ嬉しいか?っていう。 それもあってハイエンドモデルではなくエントリーモデルを選ぶことにした。

グラフィックカードを刺して GPU を Ubuntu 16.04 LTS が認識することを確認しておこう。

$ lspci | grep -i nvidia
01:00.0 VGA compatible controller: NVIDIA Corporation Device 1c82 (rev a1)
01:00.1 Audio device: NVIDIA Corporation Device 0fb9 (rev a1)

CUDA Toolkit をインストールする

インストールの第一歩として CUDA Toolkit をインストールする。 そのために、まずは NVIDIA 公式サイトを参照する。

developer.nvidia.com

インストール方法は色々とあるけど、ここでは CUDA のリモートリポジトリを APT に登録するやり方を取る。 これなら、後からマイナーアップデートがあったときにも自動で更新できる。 上記ウェブサイトで、次のように操作して deb ファイルのダウンロード URL を取得しよう。

Linux > x86_64 > Ubuntu > 16.04 > deb (network)

上記で得られた URL から deb ファイルをダウンロードしてくる。

$ wget http://developer.download.nvidia.com/compute/cuda/repos/ubuntu1604/x86_64/cuda-repo-ubuntu1604_8.0.61-1_amd64.deb

deb ファイルをインストールする。 これで CUDA のリポジトリが APT パッケージシステムに登録される。

$ sudo dpkg -i cuda-repo-ubuntu1604_8.0.61-1_amd64.deb
$ sudo apt-get update

これで APT 経由で CUDA をインストールできるようになる。 ちなみに、インストールにはとても長い時間がかかるので覚悟しておこう。 先ほどのインストール方法でローカルインストールを選んだ場合には 1.9GB のファイルを落とすことになる。 かかる時間は、推して知るべし。

$ sudo apt-get -y install cuda

上記が終わると、依存パッケージとして一緒に NVIDIA のグラフィックドライバなんかも入る。

$ dpkg -l | grep nvidia-[0-9]
ii  nvidia-375                            375.26-0ubuntu1                          amd64        NVIDIA binary driver - version 375.26
ii  nvidia-375-dev                        375.26-0ubuntu1                          amd64        NVIDIA binary Xorg driver development files

cuDNN をインストールする

次に cuDNN をインストールする。

cuDNN のインストールには、あらかじめ NVIDIA のサイトでデベロッパー登録が必要になる。 登録を済ませると cuDNN をダウンロードできるようになる。

https://developer.nvidia.com/rdp/cudnn-download

cuDNN の現行のバージョンは 5.1 となっている。 ダウンロードに使うパッケージは CUDA Toolkit のバージョンと組み合わせになっている。 今回であれば v5.1 + CUDA 8.0 のパッケージを選ぼう。

Download cuDNN v5.1 (Jan 20, 2017), for CUDA 8.0 > cuDNN v5.1 Library for Linux

ダウンロードしてきたら、それを CUDA のインストールされたディレクトリに放り込む。

$ sudo tar xvf cudnn-8.0-linux-x64-v5.1.tgz -C /usr/local
cuda/include/cudnn.h
cuda/lib64/libcudnn.so
cuda/lib64/libcudnn.so.5
cuda/lib64/libcudnn.so.5.1.10
cuda/lib64/libcudnn_static.a

libcupti-dev をインストールする

次に The CUDA Profiling Tools Interface もインストールしておく。

$ sudo apt-get -y install libcupti-dev

ここまで終わったら、一旦再起動しておこう。

$ sudo shutdown -r now

Keras/TensorFlow (GPU)

さて、次はいよいよ Keras/TensorFlow をインストールする…と言いたいところだけど、その前に。 システムの Python 実行環境を汚したくないので Python の仮想環境を作れるようにしよう。

ここでは virtualenvwrapper を使う。

$ sudo apt-get -y install virtualenvwrapper

シェルを起動し直す。

$ exec $SHELL

これで Python 仮想環境が作れるようになった。 tensorflow-with-gpu という名前で Python3 の仮想環境を作る。

$ mkvirtualenv tensorflow-with-gpu -p $(which python3)

これで最低限のパッケージが入った、システムから独立した仮想環境ができた。

(tensorflow-with-gpu) $ pip list --format=columns
Package       Version
------------- --------
appdirs       1.4.3
packaging     16.8
pip           9.0.1
pkg-resources 0.0.0
pyparsing     2.2.0
setuptools    34.3.2
six           1.10.0
wheel         0.30.0a0

ここに Keras と GPU 版 TensorFlow をインストールする。

(tensorflow-with-gpu) $ pip install keras tensorflow-gpu

インストールすると、こんな感じ。

(tensorflow-with-gpu) $ pip list --format=columns | grep -i -e keras -e tensorflow
Keras          1.2.2
tensorflow-gpu 1.0.1

ちなみに、以降はシェルのプレフィックスを表記しないけど Python 仮想環境上で実行し続けている。

GPU で学習させる

まずはベンチマーク用のアプリケーションで使うデータセット (MNIST) をダウンロードしよう。 Python の REPL を起動する。

$ python

Keras の mnist パッケージをインポートする。

>>> from keras.datasets import mnist
Using TensorFlow backend.
I tensorflow/stream_executor/dso_loader.cc:135] successfully opened CUDA library libcublas.so.8.0 locally
I tensorflow/stream_executor/dso_loader.cc:135] successfully opened CUDA library libcudnn.so.5 locally
I tensorflow/stream_executor/dso_loader.cc:135] successfully opened CUDA library libcufft.so.8.0 locally
I tensorflow/stream_executor/dso_loader.cc:135] successfully opened CUDA library libcuda.so.1 locally
I tensorflow/stream_executor/dso_loader.cc:135] successfully opened CUDA library libcurand.so.8.0 locally

load_data() 関数でデータセットのダウンロードが始まる。 これには少し時間がかかる。

>>> mnist.load_data()
Downloading data from https://s3.amazonaws.com/img-datasets/mnist.pkl.gz
...
[0, 0, 0, ..., 0, 0, 0],
[0, 0, 0, ..., 0, 0, 0],
[0, 0, 0, ..., 0, 0, 0]]], dtype=uint8), array([7, 2, 1, ..., 4, 5, 6], dtype=uint8)))

終わったら REPL から抜ける。

>>> exit()

続いて学習のベンチマークに使うアプリケーションをダウンロードしてくる。

$ curl -O https://raw.githubusercontent.com/fchollet/keras/master/examples/mnist_cnn.py
$ echo 'K.clear_session()' >> mnist_cnn.py

ベンチマーク用のアプリケーションを実行する。 これは MNIST データセットを CNN で認識するものになっている。 初めての実行だと GPU の環境を整えるので学習を始まるまで時間がかかるかも。

$ time python mnist_cnn.py
Using TensorFlow backend.
I tensorflow/stream_executor/dso_loader.cc:135] successfully opened CUDA library libcublas.so.8.0 locally
I tensorflow/stream_executor/dso_loader.cc:135] successfully opened CUDA library libcudnn.so.5 locally
I tensorflow/stream_executor/dso_loader.cc:135] successfully opened CUDA library libcufft.so.8.0 locally
I tensorflow/stream_executor/dso_loader.cc:135] successfully opened CUDA library libcuda.so.1 locally
I tensorflow/stream_executor/dso_loader.cc:135] successfully opened CUDA library libcurand.so.8.0 locally
X_train shape: (60000, 28, 28, 1)
60000 train samples
10000 test samples
Train on 60000 samples, validate on 10000 samples
Epoch 1/12
W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use SSE3 instructions, but these are available on your machine and could speed up CPU computations.
W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use SSE4.1 instructions, but these are available on your machine and could speed up CPU computations.
W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use SSE4.2 instructions, but these are available on your machine and could speed up CPU computations.
I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:910] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
I tensorflow/core/common_runtime/gpu/gpu_device.cc:885] Found device 0 with properties:
name: GeForce GTX 1050 Ti
major: 6 minor: 1 memoryClockRate (GHz) 1.455
pciBusID 0000:01:00.0
Total memory: 3.94GiB
Free memory: 3.84GiB
I tensorflow/core/common_runtime/gpu/gpu_device.cc:906] DMA: 0
I tensorflow/core/common_runtime/gpu/gpu_device.cc:916] 0:   Y
I tensorflow/core/common_runtime/gpu/gpu_device.cc:975] Creating TensorFlow device (/gpu:0) -> (device: 0, name: GeForce GTX 1050 Ti, pci bus id: 0000:01:00.0)
60000/60000 [==============================] - 8s - loss: 0.3661 - acc: 0.8871 - val_loss: 0.0864 - val_acc: 0.9726
Epoch 2/12
60000/60000 [==============================] - 6s - loss: 0.1337 - acc: 0.9604 - val_loss: 0.0613 - val_acc: 0.9806
Epoch 3/12
60000/60000 [==============================] - 6s - loss: 0.1046 - acc: 0.9694 - val_loss: 0.0524 - val_acc: 0.9838
Epoch 4/12
60000/60000 [==============================] - 6s - loss: 0.0879 - acc: 0.9737 - val_loss: 0.0428 - val_acc: 0.9858
Epoch 5/12
60000/60000 [==============================] - 6s - loss: 0.0755 - acc: 0.9773 - val_loss: 0.0393 - val_acc: 0.9870
Epoch 6/12
60000/60000 [==============================] - 6s - loss: 0.0683 - acc: 0.9801 - val_loss: 0.0368 - val_acc: 0.9875
Epoch 7/12
60000/60000 [==============================] - 6s - loss: 0.0648 - acc: 0.9806 - val_loss: 0.0367 - val_acc: 0.9888
Epoch 8/12
60000/60000 [==============================] - 6s - loss: 0.0593 - acc: 0.9820 - val_loss: 0.0353 - val_acc: 0.9889
Epoch 9/12
60000/60000 [==============================] - 6s - loss: 0.0548 - acc: 0.9833 - val_loss: 0.0328 - val_acc: 0.9890
Epoch 10/12
60000/60000 [==============================] - 6s - loss: 0.0529 - acc: 0.9845 - val_loss: 0.0316 - val_acc: 0.9897
Epoch 11/12
60000/60000 [==============================] - 6s - loss: 0.0508 - acc: 0.9848 - val_loss: 0.0308 - val_acc: 0.9902
Epoch 12/12
60000/60000 [==============================] - 6s - loss: 0.0484 - acc: 0.9852 - val_loss: 0.0298 - val_acc: 0.9899
Test score: 0.0297728348608
Test accuracy: 0.9899

real  1m28.273s
user  1m39.684s
sys   0m8.740s

全体の実行が一分半ほどで終わった。 一回のエポックは大体 6 秒ほどで終わっている。

ちなみに GPU の仕事っぷりは nvidia-smi コマンドから見られる。

$ nvidia-smi
Sun Mar 12 20:24:56 2017
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 375.26                 Driver Version: 375.26                    |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|===============================+======================+======================|
|   0  GeForce GTX 105...  Off  | 0000:01:00.0      On |                  N/A |
| 35%   38C    P0    58W /  75W |   3862MiB /  4037MiB |     77%      Default |
+-------------------------------+----------------------+----------------------+

+-----------------------------------------------------------------------------+
| Processes:                                                       GPU Memory |
|  GPU       PID  Type  Process name                               Usage      |
|=============================================================================|
|    0      1510    G   /usr/lib/xorg/Xorg                              59MiB |
|    0      3374    C   python                                        3799MiB |
+-----------------------------------------------------------------------------+

毎秒ごとに表示を更新したいときは、こんな感じで。

$ nvidia-smi -l 1

CPU で学習させる

ちなみに対比として CPU を使ったときの例も載せておく。

使った CPU は次の通り。 第一世代 Core i7 なので、なかなか古い。 現行世代なら Core i3 くらいの性能らしい。

$ cat /proc/cpuinfo | grep -i name | head -n 1
model name      : Intel(R) Core(TM) i7 CPU         860  @ 2.80GHz
$ cat /proc/cpuinfo | grep -i name | wc -l
8

余談ながら、当然チップセットやマザーボードも古いので今回使ったグラフィックカードを認識するかドキドキした。

先ほどインストールした GPU 版 TensorFlow をアンインストールして CPU 版 TensorFlow をインストールする。

$ pip uninstall -y tensorflow-gpu
$ pip install tensorflow

先ほどと同じようにベンチマーク用のアプリケーションを実行する。

$ time python mnist_cnn.py
Using TensorFlow backend.
X_train shape: (60000, 28, 28, 1)
60000 train samples
10000 test samples
Train on 60000 samples, validate on 10000 samples
Epoch 1/12
W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use SSE3 instructions, but these are available on your machine and could speed up CPU computations.
W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use SSE4.1 instructions, but these are available on your machine and could speed up CPU computations.
W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use SSE4.2 instructions, but these are available on your machine and could speed up CPU computations.
60000/60000 [==============================] - 117s - loss: 0.3733 - acc: 0.8858 - val_loss: 0.0885 - val_acc: 0.9723
Epoch 2/12
60000/60000 [==============================] - 117s - loss: 0.1347 - acc: 0.9596 - val_loss: 0.0635 - val_acc: 0.9799
Epoch 3/12
60000/60000 [==============================] - 115s - loss: 0.1031 - acc: 0.9693 - val_loss: 0.0565 - val_acc: 0.9818
Epoch 4/12
60000/60000 [==============================] - 115s - loss: 0.0887 - acc: 0.9740 - val_loss: 0.0448 - val_acc: 0.9852
Epoch 5/12
60000/60000 [==============================] - 114s - loss: 0.0780 - acc: 0.9773 - val_loss: 0.0415 - val_acc: 0.9868
Epoch 6/12
60000/60000 [==============================] - 113s - loss: 0.0707 - acc: 0.9788 - val_loss: 0.0376 - val_acc: 0.9870
Epoch 7/12
60000/60000 [==============================] - 112s - loss: 0.0651 - acc: 0.9808 - val_loss: 0.0352 - val_acc: 0.9893
Epoch 8/12
60000/60000 [==============================] - 112s - loss: 0.0604 - acc: 0.9818 - val_loss: 0.0352 - val_acc: 0.9888
Epoch 9/12
60000/60000 [==============================] - 112s - loss: 0.0555 - acc: 0.9840 - val_loss: 0.0332 - val_acc: 0.9884
Epoch 10/12
60000/60000 [==============================] - 112s - loss: 0.0542 - acc: 0.9835 - val_loss: 0.0322 - val_acc: 0.9895
Epoch 11/12
60000/60000 [==============================] - 111s - loss: 0.0495 - acc: 0.9853 - val_loss: 0.0319 - val_acc: 0.9897
Epoch 12/12
60000/60000 [==============================] - 110s - loss: 0.0477 - acc: 0.9857 - val_loss: 0.0326 - val_acc: 0.9893
Test score: 0.032642004689
Test accuracy: 0.9893

real  22m54.715s
user  123m6.744s
sys   46m53.296s

今度は 23 分ほどかかった。

GPU が 1 分 28 秒で終わったのに対して CPU では 22 分 54 秒かかっている。 つまり、GPU を使うことで学習が 15.6 倍高速化できた。 一回のエポック当たりでは 19.1 倍速くなっている。

まとめ

今回は OS として Ubuntu 16.04 LTS を使って Keras/TensorFlow の学習を GPU で高速化してみた。 CUDA がサポートしている GPU は Compute Capability という数値で表現されている。 この数値が大きいものほど、新しい世代のものになっている。 使う GPU については NVIDIA で、少なくとも Compute Capability が 3.0 以上のものを選ぼう。 もちろん、将来の CUDA で最小要件の Compute Capability は上がっていくはずなので、なるべく大きい方が良い。

いじょう。

MSI GeForce GTX 1050 Ti 4G OC グラフィックスボード VD6191

MSI GeForce GTX 1050 Ti 4G OC グラフィックスボード VD6191

Ubuntu 16.04 LTS の NIC に固定 IP アドレスを振る

たまに設定する機会があると、毎回どうやるんだっけとなって調べるので。

今回使った環境は次の通り。

$ cat /etc/lsb-release
DISTRIB_ID=Ubuntu
DISTRIB_RELEASE=16.04
DISTRIB_CODENAME=xenial
DISTRIB_DESCRIPTION="Ubuntu 16.04.2 LTS"
$ uname -r
4.4.0-66-generic

NIC は enp9s0 という名前で認識されている。

/etc/network/interfaces を編集する

NIC の設定は /etc/network/interfaces という設定ファイルで行う。 初期設定だと、こんな感じで DHCP を使うようになっていると思う。

$ cat /etc/network/interfaces
# This file describes the network interfaces available on your system
# and how to activate them. For more information, see interfaces(5).

source /etc/network/interfaces.d/*

# The loopback network interface
auto lo
iface lo inet loopback

# The primary network interface
auto enp9s0
iface enp9s0 inet dhcp

これを、こんな感じにする。 設定方法を静的 (static) にした上で、アドレスとネットマスク、ゲートウェイと DNS サーバを指定する。

$ cat /etc/network/interfaces
# This file describes the network interfaces available on your system
# and how to activate them. For more information, see interfaces(5).

source /etc/network/interfaces.d/*

# The loopback network interface
auto lo
iface lo inet loopback

# The primary network interface
auto enp9s0
# DHCP はコメントアウトしておくとか
# iface enp9s0 inet dhcp
# 静的アドレスを設定する
iface enp9s0 inet static
address 192.168.0.10
netmask 255.255.255.0
gateway 192.168.0.1
dns-nameservers 192.168.0.1

編集するときはエディタを使っても良いし、例えば今回の編集内容であればこんな感じのコマンドで書き換えても良い。

$ sudo sed -i "/iface enp9s0 inet dhcp/d" /etc/network/interfaces
$ cat << 'EOF' | sudo tee -a /etc/network/interfaces > /dev/null
iface enp9s0 inet static
address 192.168.0.10
netmask 255.255.255.0
gateway 192.168.0.1
dns-nameservers 192.168.0.1
EOF

後は再起動して設定を反映する。

$ sudo shutdown -r now

いじょう。

Ubuntu 16.04 LTS で ISO ファイルをメディアに書き込む

例えばインストール用の ISO ファイルをダウンロードしてきて、それを DVD-R とかに焼くときにやり方について。

今回使った環境は次の通り。

$ cat /etc/lsb-release
DISTRIB_ID=Ubuntu
DISTRIB_RELEASE=16.04
DISTRIB_CODENAME=xenial
DISTRIB_DESCRIPTION="Ubuntu 16.04.2 LTS"
$ uname -r
4.4.0-66-generic

今回使った環境では、メディアに書き込むためのデバイスは /dev/sr0 で認識されている。

$ ls /dev/sr0
/dev/sr0

書き込む ISO ファイルは Ubuntu のインストールメディアにした。

$ file ubuntu-14.04.5-server-amd64.iso
ubuntu-14.04.5-server-amd64.iso: DOS/MBR boot sector ISO 9660 CD-ROM filesystem data (DOS/MBR boot sector) 'Ubuntu-Server 14.04.5 LTS amd64' (bootable); partition 2 : ID=0xef, start-CHS (0x3ff,254,63), end-CHS (0x3ff,254,63), startsector 271196, 4544 sectors

まずは書き込むのに使うツールの入ったパッケージをインストールする。

$ sudo apt-get -y install growisofs

書き込みは growisofs コマンドでやる。 引数として、デバイスと書き込む ISO ファイルを指定してやる。

$ growisofs -Z <dev>=<isofile>

今回の例なら、こんな感じ。 内部的には dd コマンドが使われているみたいだ。

$ growisofs -Z /dev/sr0=ubuntu-14.04.5-server-amd64.iso
Executing 'builtin_dd if=ubuntu-14.04.5-server-amd64.iso of=/dev/sr0 obs=32k seek=0'
/dev/sr0: "Current Write Speed" is 16.4x1352KBps.
   23035904/649068544 ( 3.5%) @4.8x, remaining 2:15 RBU 100.0% UBU   0.2%
   52232192/649068544 ( 8.0%) @6.3x, remaining 1:31 RBU 100.0% UBU  99.4%
   81821696/649068544 (12.6%) @6.4x, remaining 1:16 RBU 100.0% UBU  99.6%
  111935488/649068544 (17.2%) @6.5x, remaining 1:11 RBU 100.0% UBU  99.6%
  142442496/649068544 (21.9%) @6.6x, remaining 1:04 RBU  99.9% UBU  99.6%
  175964160/649068544 (27.1%) @7.3x, remaining 0:56 RBU 100.0% UBU  99.6%
  209977344/649068544 (32.4%) @7.4x, remaining 0:52 RBU 100.0% UBU  99.6%
  244482048/649068544 (37.7%) @7.5x, remaining 0:46 RBU 100.0% UBU  99.6%
  276889600/649068544 (42.7%) @7.0x, remaining 0:41 RBU 100.0% UBU  99.6%
  312344576/649068544 (48.1%) @7.7x, remaining 0:37 RBU 100.0% UBU  99.6%
  348291072/649068544 (53.7%) @7.8x, remaining 0:32 RBU 100.0% UBU  99.6%
  384729088/649068544 (59.3%) @7.9x, remaining 0:28 RBU 100.0% UBU  99.4%
  418971648/649068544 (64.5%) @7.4x, remaining 0:24 RBU 100.0% UBU  99.4%
  456359936/649068544 (70.3%) @8.1x, remaining 0:20 RBU 100.0% UBU  99.6%
  494206976/649068544 (76.1%) @8.2x, remaining 0:15 RBU 100.0% UBU  99.4%
  532545536/649068544 (82.0%) @8.3x, remaining 0:12 RBU 100.0% UBU  99.2%
  571375616/649068544 (88.0%) @8.4x, remaining 0:07 RBU 100.0% UBU  99.4%
  607944704/649068544 (93.7%) @7.9x, remaining 0:04 RBU 100.0% UBU  99.4%
  647725056/649068544 (99.8%) @8.6x, remaining 0:00 RBU   4.0% UBU  99.6%
builtin_dd: 316928*2KB out @ average 7.2x1352KBps
/dev/sr0: flushing cache
/dev/sr0: updating RMA
/dev/sr0: closing session
/dev/sr0: reloading tray

あとは焼きあがったメディアを使ってインストールするだけ。

Ubuntu 16.04 LTS のキーボードを日本語 (JIS) に変更する

2018-08-25 追記: Ubuntu 18.04 LTS でも同じ操作で変更できた

たまにインストールしたとき間違えて英語 (US) に設定して後から直すことになるので。

使った環境は次の通り。

$ cat /etc/lsb-release
DISTRIB_ID=Ubuntu
DISTRIB_RELEASE=16.04
DISTRIB_CODENAME=xenial
DISTRIB_DESCRIPTION="Ubuntu 16.04.2 LTS"
$ uname -r
4.4.0-66-generic

キーボードの設定を変更するには dpkg-reconfigure コマンドを keyboard-configuration 引数をつけて実行する。

$ sudo dpkg-reconfigure keyboard-configuration

表示されるウィザードに沿って設定を入力していく。 二番目の設定で Japanese にするのがポイント。

Generic 105-key (Intel) PC > Japanese > Japanese > The default for the keyboard layout > No compose key

いじょう。