CUBE SUGAR CONTAINER

技術系のこと書きます。

Python: scikit-learn で主成分分析 (PCA) してみる

主成分分析 (PCA) は、主にデータ分析や統計の世界で使われる道具の一つ。 データセットに含まれる次元が多いと、データ分析をするにせよ機械学習をするにせよ分かりにくさが増える。 そんなとき、主成分分析を使えば取り扱う必要のある次元を圧縮 (削減) できる。 ただし、ここでいう圧縮というのは非可逆なもので、いくらか失われる情報は出てくる。 今回は、そんな主成分分析を Python の scikit-learn というライブラリを使って試してみることにした。

今回使った環境は次の通り。

$ sw_vers 
ProductName:    Mac OS X
ProductVersion: 10.12.4
BuildVersion:   16E195
$ python --version
Python 3.6.1

下準備

あらかじめ、今回使う Python のパッケージを pip でインストールしておく。

$ pip install matplotlib scipy scikit-learn

主成分分析の考え方

前述した通り、主成分分析はデータセットの次元を圧縮 (削減) するのに用いる。 ただし、実は元のデータセットと分析結果で次元数を変えないようにすることもできる。 それじゃあ圧縮できていないじゃないかという話になるんだけど、実は分析結果では次元ごとの性質が異なっている。 これは、例えるなら「すごく重要な次元・それなりに重要な次元・あんまり重要じゃない次元」と分かれているような感じ。 そして、その中から重要な次元をいくつかピックアップして使えば、次元の数が減るというわけ。 もちろん、そのとき選ばれなかった「あんまり重要じゃない次元」に含まれていた情報は失われてしまう。

では、主成分分析ではどのような基準で次元の重要さを決めるのだろうか。 これは、データの分散が大きな次元ほど、より多くの情報を含んでいると考える。 分散というのは、データのバラつきの大きさを表す統計量なので、ようするに値がバラけている方が価値が大きいと捉える。 分散が小さいというのは、ようするにどの値も似たり寄ったりで差異を見出すのが難しいということ。 それに対し、分散が大きければ値ごとの違いも見つけやすくなる。

例えば、次のような x 次元と y 次元から成る、二次元のデータを考えてみよう。 この中には (1, 2), (2, 4), (3, 6) という三点の要素が含まれる。

f:id:momijiame:20170402110001p:plain

ここで x 次元の標本分散は  \frac{2}{3} で、y 次元の標本分散は  \frac{8}{3} になる。 主成分分析の考え方でいくと y 次元の方が分散が大きいので、より重要といえる。 ただ、上記のデータは二つの次元が相関しているようだ。 相関しているということは、似たような情報を含む次元が二つある、とも捉えることができる。

では、上記で相関に沿って新しい次元を作ってみたら、どうなるだろうか?

f:id:momijiame:20170402121107p:plain

値の間隔はピタゴラスの定理から  \sqrt{5} となることが分かる。 これは x 次元の間隔である 1 や y 次元の間隔である 2 よりも大きい。

f:id:momijiame:20170402121423p:plain

間隔が大きいということは分散も大きくなることが分かる。

続いては、先ほどの相関に沿って作った次元とは直交する軸でさらに新しい次元を作ってみよう。

f:id:momijiame:20170402121737p:plain

今度は、新しい次元からそれぞれの要素を見てみよう。 このとき、全ての要素は同じ場所にいるので間隔は 0 になっている。 つまり、分散も 0 なので、この次元には全然情報が含まれていないことになる。

上記の作業によって、情報がたくさん含まれる次元と、全く含まれない次元に分けることができた。 あとは、最初に作った情報がたくさん含まれる次元だけを使えば、二次元を一次元に圧縮できたことになる。 実は、これこそ正に主成分分析でしている作業を表している。

実際に試してみる

やっていることの概要は分かったので、次は実際にその通りになるのか試してみよう。 データセットとしては、まずは先ほどの三点をそのまま使ってみる。

次のサンプルコードでは、相関した三点のデータを主成分分析している。 そして、元のデータと分析結果を散布図にした。 また、分析結果の各次元の寄与率というものも出力している。 scikit-learn では

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import numpy as np
from matplotlib import pyplot as plt
from sklearn.decomposition import PCA


def main():
    # y = 2x
    features = np.array([[1, 2], [2, 4], [3, 6]])

    # グラフ描画サイズを設定する
    plt.figure(figsize=(12, 4))

    # 元データをプロットする
    plt.subplot(1, 2, 1)
    plt.scatter(features[:, 0], features[:, 1])
    plt.title('origin')
    plt.xlabel('x')
    plt.ylabel('y')

    # 主成分分析する
    pca = PCA()
    pca.fit(features)

    # 分析結果を元にデータセットを主成分に変換する
    transformed = pca.fit_transform(features)

    # 主成分をプロットする
    plt.subplot(1, 2, 2)
    plt.scatter(transformed[:, 0], transformed[:, 1])
    plt.title('principal component')
    plt.xlabel('pc1')
    plt.ylabel('pc2')

    # 主成分の次元ごとの寄与率を出力する
    print(pca.explained_variance_ratio_)

    # グラフを表示する
    plt.show()


if __name__ == '__main__':
    main()

それでは、上記のサンプルコードを実行してみよう。 出力されたリストは、分析結果の各次元の寄与率を表している。

$ python pca.py 
[ 1.  0.]

寄与率というのは、前述した「各次元の重要度」を表したもの。 その次元に元のデータからどれだけの割合で情報が含まれているかで、全てを足すと 1 になるように作られている。 つまり、主成分分析をした結果から全ての次元を使えば、元のデータセットから情報の損失は起こらない。 ただし、それだと次元も圧縮できないことになる。

先ほどの出力結果を見ると、最初の次元に寄与率が全て集中している。 つまり、最初の次元だけに全ての情報が含まれていることになる。 これは、先ほど主成分分析の概要を図示したときに得られた結論と一致している。

では、上記をグラフでも確認してみよう。 次のグラフは、主成分分析の前後を散布図で比べたもの。 左が元データで、右が分析結果となっている。

f:id:momijiame:20170402123229p:plain

見て分かる通り、先ほど図示した内容と一致している。 ちなみに、主成分分析では分析結果として得られた次元のことを第 n 主成分と呼ぶ。 例えば、最初に作った次元なら第一主成分、次に作った次元なら第二主成分という風になる。 今回の例では第一主成分に必要な情報が全て集中した。

アイリスデータセットを主成分分析してみる

次はもうちょっとだけそれっぽいデータを使ってみることにする。 みんな大好きアイリスデータセットは、あやめという花の特徴量と品種を含んでいる。 この特徴量は四次元なので、別々のグラフに分けたりしないと本来は可視化できない。 今回は、主成分分析を使って二次元に圧縮して可視化してみることにしよう。

次のサンプルコードではアイリスデータセットの次元を主成分分析している。 そして、分析結果から第二主成分までを取り出して散布図に可視化した。 また、同時に寄与率と累積寄与率を出力するようにした。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import numpy as np
from matplotlib import pyplot as plt
from sklearn.decomposition import PCA
from sklearn import datasets


def main():
    dataset = datasets.load_iris()

    features = dataset.data
    targets = dataset.target

    # 主成分分析する
    pca = PCA(n_components=2)
    pca.fit(features)

    # 分析結果を元にデータセットを主成分に変換する
    transformed = pca.fit_transform(features)

    # 主成分をプロットする
    for label in np.unique(targets):
        plt.scatter(transformed[targets == label, 0],
                    transformed[targets == label, 1])
    plt.title('principal component')
    plt.xlabel('pc1')
    plt.ylabel('pc2')

    # 主成分の寄与率を出力する
    print('各次元の寄与率: {0}'.format(pca.explained_variance_ratio_))
    print('累積寄与率: {0}'.format(sum(pca.explained_variance_ratio_)))

    # グラフを表示する
    plt.show()


if __name__ == '__main__':
    main()

それでは、上記を実行してみよう。 コンソールには寄与率と累積寄与率が表示される。

$ python pcairis.py 
各次元の寄与率: [ 0.92461621  0.05301557]
累積寄与率: 0.9776317750248034

寄与率は先ほど説明した通りで、ここでの累積寄与率は使うことにした次元の寄与率を足したもの。 ようするに、今回の場合なら第一主成分と第二主成分の寄与率を足したものになっている。 累積寄与率は約 0.97 で、ようするに第二主成分までで元のデータの 97% が表現できていることが分かる。

同時に、次のような散布図が表示される。 これは、第一主成分と第二主成分を x 軸と y 軸に取って散布図にしたもの。 点の色の違いは品種を表している。

f:id:momijiame:20170402125421p:plain

本来なら四次元の特徴量で複数の散布図になるところを、主成分分析を使うことで一つの散布図にできた。

まとめ

今回は Python の scikit-learn を使って主成分分析について学んだ。 データセットに含まれる次元が多いと、データ分析なら分かりにくいし、機械学習なら計算量が増えてしまう。 そんなとき主成分分析を使えば、重要さが異なる新たな次元を含んだデータが分析結果として得られる。 その中から、重要なものをいくつかピックアップして使えば、データの損失を最小限に抑えて次元を減らすことができる。

参考文献

実践 機械学習システム

実践 機械学習システム

Python: ソケットプログラミングのアーキテクチャパターン

今回はソケットプログラミングについて。 ソケットというのは Unix 系のシステムでネットワークを扱うとしたら、ほぼ必ずといっていいほど使われているもの。 ホスト間の通信やホスト内での IPC など、ネットワークを抽象化したインターフェースになっている。

そんな幅広く使われているソケットだけど、取り扱うときには色々なアーキテクチャパターンが考えられる。 また、比較的低レイヤーな部分なので、効率的に扱うためにはシステムコールなどの、割りと OS レベルに近い知識も必要になってくる。 ここらへんの話は、体系的に語られているドキュメントが少ないし、あっても鈍器のような本だったりする。 そこで、今回はそれらについてざっくりと見ていくことにした。

尚、今回はプログラミング言語として Python を使うけど、何もこれは特定の言語に限った話ではない。 どんな言語を使うにしても、あるいは表面上は抽象化されたインターフェースで隠蔽されていても、内部的にはソケットが使われている。 例えば Java サーブレットや Ruby on Rails で Web アプリケーションを書くにしても、それが動くサーバの通信部分はソケットで書かれていることだろう。

動作確認に使った環境は次の通り。

$ sw_vers
ProductName:    Mac OS X
ProductVersion: 10.12.4
BuildVersion:   16E195
$ python --version
Python 3.6.1

もくじ

ブロッキングとノンブロッキングについて

まず、ソケットを扱うには大きく分けて「ブロッキングで使うか・ノンブロッキングで使うか」を選ぶことになる。 その中でも基本となる使い方はブロッキングで、こちらの方が逐次的なプログラミングモデルとなりやすいので理解も早い。 ではノンブロッキングにはどんなメリットがあるかというと、こちらは通信相手が増えたときのパフォーマンス面 (スケーラビリティ) で優れている。

このエントリでは、ソケットの扱い方をブロッキング・ノンブロッキングと分けた上で、それぞれにどんなアーキテクチャパターンが考えられるか見ていく。 しかし、その前にまずは事前知識としてソケットにおけるブロッキング・ノンブロッキングという概念自体の説明から入ろう。

まず、ソケットというオブジェクトに対してはデータの読み込みや書き込みを指示できる。 読み込まれるデータは通信相手から送られてきたもので、書き込まれたデータは通信相手に送り届けられる。 しかし、それらのデータを読み書きする指示は即座に完了するわけではない。 具体的には、ソケットには読み書きができる状態とできない状態があるためだ。 読み書きができないというのは、わんこそばで例えると口の中でもぐもぐしている最中で、次のそばを口に入れられない状態を指す。

では、もしも読み込みや書き込みができない状態にあるソケットに対して、その指示を出したらどう振る舞うのだろうか。 ブロッキング・ノンブロッキングの違いというのは、正にこの「どう振る舞うか」の違いを指す。 ブロッキングというのは、読み書きができる状態になるまで、じっとそのまま待つことを意味している。 それに対して、ノンブロッキングは読み書きができない状態にあるときエラーを出してすぐに処理を終了する。

これで、ソケットのブロッキング・ノンブロッキングの違いについて説明できた。

ソケットをブロッキングで扱う場合

さて、前フリが長くなったけど、ここからは具体的なアーキテクチャパターンを見ていくことにしよう。 初めは、基本的な使い方であるソケットをブロッキングで扱う場合から。

今回、サンプルコードとして題材にするのはエコーサーバにした。 エコーサーバというのは、クライアントから送られてきたデータを、そのままオウム返しでクライアントに送り返すサーバのことをいう。

実装については IPv4 のループバックアドレスを使って TCP:37564 ポートでクライアントからの接続を待ち受けるようにした。 ループバックアドレスとは何か、みたいな TCP/IP 的な概念についての説明は省く。 これは、今回の主題として扱うアーキテクチャパターンという範疇からは、ちょっと外れるため。

あと、クライアントサイドについても自分で書いても良いんだけど、今回はありものを使うことにした。 ここでは netcat というツールを使うことにしよう。 netcatHomebrew を使ってインストールする。

$ brew install netcat

Homebrew が入っていないときは入れる感じで。

シングルスレッド

まずは、ソケットがブロッキングで、それをシングルスレッドで扱う場合から考えてみよう。 これが、最もシンプルなパターンといえるはず。

早速だけどサンプルコードを以下に示す。 それぞれの処理の内容はコメントで補足している。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import socket


def main():
    # IPv4/TCP のソケットを用意する
    serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    # 'Address already in use' の回避策 (必須ではない)
    serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)

    # 待ち受けるアドレスとポートを指定する
    # もし任意のアドレスで Listen したいときは '' を使う
    host = 'localhost'
    port = 37564
    serversocket.bind((host, port))

    # クライアントをいくつまでキューイングするか
    serversocket.listen(128)

    while True:
        # クライアントからの接続を待ち受ける (接続されるまでブロックする)
        clientsocket, (client_address, client_port) = serversocket.accept()
        print('New client: {0}:{1}'.format(client_address, client_port))

        while True:
            # クライアントソケットから指定したバッファバイト数だけデータを受け取る
            try:
                message = clientsocket.recv(1024)
                print('Recv: {}'.format(message))
            except OSError:
                break

            # 受信したデータの長さが 0 ならクライアントからの切断を表す
            if len(message) == 0:
                break

            # 受信したデータをそのまま送り返す (エコー)
            sent_message = message
            while True:
                # 送信できたバイト数が返ってくる
                sent_len = clientsocket.send(sent_message)
                # 全て送れたら完了
                if sent_len == len(sent_message):
                    break
                # 送れなかった分をもう一度送る
                sent_message = sent_message[sent_len:]
            print('Send: {}'.format(message))

        # 後始末
        clientsocket.close()
        print('Bye-Bye: {0}:{1}'.format(client_address, client_port))


if __name__ == '__main__':
    main()

サーバにおけるソケットプログラミングの基本的な流れは次の通り。

  • ソケットを作る (socket)
  • 待ち受けるアドレスとポートを指定する (bind)
  • 接続キューの長さを指定して接続を待ち受ける (listen)
  • 接続してきたクライアントからソケットを取得する (accept)
  • 取得したクライアントのソケットに対して読み書きする (send/recv)

このパターンでは、上記の一連の処理を一つのスレッドでこなしていく。

それではサンプルコードを実行してみよう。 これで、エコーサーバが起動する。 とはいえ、クライアントが接続しない限り特に何も表示されることはない。

$ python singlethread.py

続いて、別のターミナルを開いて netcat を実行しよう。 次のようにすると、先ほど起動したエコーサーバに接続できる。

$ nc localhost 37564

すると、エコーサーバを起動したターミナルに、クライアントからの接続を表す表示が出るはず。

$ python singlethread.py
New client: 127.0.0.1:63917

さらに netcat のターミナルで文字列を入力して Enter すると、同じ内容がまた表示される。 これは、送信した内容がエコーサーバからオウム返しで返ってきたことを意味する。

$ nc localhost 37564
hogehoge
hogehoge

エコーサーバのターミナルを見ると、送受信した内容が表示されている。

$ python singlethread.py
New client: 127.0.0.1:63917
Recv: b'hogehoge\n'
Send: b'hogehoge\n'

netcat は Ctrl キーと C キーを一緒に押すことで終了できる。 これでサーバとの接続も切断される。

$ nc localhost 37564
hogehoge
hogehoge
^C

サーバの方にもクライアントとの接続が切れた旨が表示された。

$ python singlethread.py
New client: 127.0.0.1:63917
Recv: b'hogehoge\n'
Send: b'hogehoge\n'
Recv: b''
Bye-Bye: 127.0.0.1:63917

ここまで見た限り、このパターンで何の問題も無いように見える。 しかし、クライアントを二つにすると問題点が分かってくる。

サーバを一旦終了して、もう一度起動し直そう。 ちなみにサーバについても Ctrl-C で終了できる。

$ python singlethread.py

そして、改めて別のターミナルから netcat で接続する。

$ nc localhost 37564

クライアントが一つなら、サーバは接続を正常に受け付ける。

$ python singlethread.py
New client: 127.0.0.1:49746

では、さらにもう一つターミナルを開いて netcat で接続してみると、どうだろうか?

$ nc localhost 37564

今度は、サーバ側に接続を受け付けたメッセージが表示されない。

$ python singlethread.py
New client: 127.0.0.1:49746

そう、ソケットをブロッキングかつシングルスレッドで扱う場合、二つ以上のクライアントを同時に上手くさばくことができない。 なぜなら、唯一のスレッドは最初のクライアントからデータを読み書きする仕事に従事しているからだ。

先ほどのサンプルコードでいえば以下、クライアントからの新たなデータの到来を待ち続けて (ブロックして) いることだろう。

message = clientsocket.recv(1024)

唯一のスレッドが一つのクライアントにかかりきりなので、以下の別のクライアントからの接続を受け付ける処理は実行されない。 クライアントからの接続は、ソケットの接続キューに積まれたまま放置プレイを食らう。

clientsocket, (client_address, client_port) = serversocket.accept()

今かかりきりになっている相手との通信が終わるまで、別のクライアントは受け付けることができないというわけ。

マルチスレッド

ソケットをブロッキングで扱うとき、シングルスレッドでは二つ以上のクライアントを上手くさばけないことが分かった。 そこで、次はクライアントを処理するスレッドを複数用意してマルチスレッドにしてみよう。

先ほどの内容に手を加えて、マルチスレッドにしたサンプルコードは次の通り。 先ほどと同じ処理についてはコメントを省いて、新たに追加したり変更したところにコメントを書いている。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import socket
import threading


def client_handler(clientsocket, client_address, client_port):
    """クライアントとの接続を処理するハンドラ"""
    while True:
        try:
            message = clientsocket.recv(1024)
            print('Recv: {0} from {1}:{2}'.format(message,
                                                  client_address,
                                                  client_port))
        except OSError:
            break

        if len(message) == 0:
            break

        sent_message = message
        while True:
            sent_len = clientsocket.send(sent_message)
            if sent_len == len(sent_message):
                break
            sent_message = sent_message[sent_len:]
        print('Send: {0} to {1}:{2}'.format(message,
                                            client_address,
                                            client_port))

    clientsocket.close()
    print('Bye-Bye: {0}:{1}'.format(client_address, client_port))


def main():
    serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)

    host = 'localhost'
    port = 37564
    serversocket.bind((host, port))

    serversocket.listen(128)

    while True:
        clientsocket, (client_address, client_port) = serversocket.accept()
        print('New client: {0}:{1}'.format(client_address, client_port))

        # 接続してきたクライアントを処理するスレッドを用意する
        client_thread = threading.Thread(target=client_handler,
                                         args=(clientsocket,
                                               client_address,
                                               client_port))
        # 親 (メイン) スレッドが死んだら子も道連れにする
        client_thread.daemon = True
        # スレッドを起動する
        client_thread.start()


if __name__ == '__main__':
    main()

先ほどとの違いは、クライアントとの接続に対してスレッドが一対一で生成されるところだ。 プログラムが起動された直後に生成されるメインスレッドは、クライアントからの接続を受け付ける仕事だけに専念している。 実際に受け付けたクライアントとの接続の処理は、新たに生成した子スレッドに任せるわけだ。

では、上記サンプルコードの動作を確認してみよう。 まずはエコーサーバを起動する。

$ python multithread.py

そして、二つのターミナルからエコーサーバに接続してみよう。

$ nc localhost 37564

すると、今度は二つのクライアントから接続を受け付けた旨が表示された。

$ python multithread.py
New client: 127.0.0.1:51027
New client: 127.0.0.1:51028

それぞれのクライアントのターミナルで文字列を入力すると、ちゃんとエコーバックされるし上手く動いている。

$ python multithread.py
New client: 127.0.0.1:51027
New client: 127.0.0.1:51028
Recv: b'hogehoge\n' from 127.0.0.1:51027
Send: b'hogehoge\n' to 127.0.0.1:51027
Recv: b'hogehoge\n' from 127.0.0.1:51028
Send: b'hogehoge\n' to 127.0.0.1:51028

マルチスレッド (スレッドプール)

先ほどの例では、クライアントを処理する部分をマルチスレッド化することで、二つ以上のクライアントを同時にさばけるようになった。 しかし、実は先ほどのやり方ではクライアントの接続数がどんどん増えていくと問題になってくることがある。 それは、メモリの使用量とコンテキストスイッチにかかるコストの増加だ。

スレッドというのは、新たに作ろうとするとそれ用のコンテキストを必要とする。 この、コンテキストというのは各スレッドの状態を保持しておくために必要なメモリに他ならない。 スレッドあたりのコンテキストのサイズは状態や実装に依存するので、これくらいとはなかなか言いづらいものがある。 とはいえ、一つ一つが小さくてもクライアントの接続数が増えれば決してばかにできないサイズになってくる。

また、コンテキストスイッチというのは、CPU が処理しているスレッドを OS が途中で切り替える作業のことをいう。 まず、CPU というのは同時に処理できるスレッドの数が、あらかじめ製品ごとに決まっている。 例えば、今売られている Intel や AMD の x86-64 アーキテクチャの CPU を例に挙げてみよう。 この場合は、物理コアあたり 1 または 2 スレッドである場合が多い。 つまり、同時に処理できるスレッドには機械的な上限がある。 ちなみに、物理コアあたり同時 2 スレッドの製品については、OS からは論理コアが 2 つあるように扱われる。

にも関わらず、実のところ私たちは普段からそれよりも多くのスレッドを同時に起動して扱っている。 なぜそんなことができるかというと、CPU が実行するスレッドを、OS が途中で別のスレッドに入れ替えているためだ。 この入れ替えは、ごく短時間で行われているので、見かけ上はたくさんのスレッドが同時に実行できているかのように見える。

しかしながら、この入れ替え作業には短時間ながらもちろん時間がかかる。 そして、CPU で同時に処理できるスレッドの数に対して、OS が扱うスレッドの数が増えてくると、その頻度も上がる。 これによって、切り替え作業に要する時間が増えて、だんだんと CPU が非効率な使われ方をしてしまうことがある。

先ほどのサンプルコードでは、まさに上記の二つが問題となる。 なぜなら、生成するスレッドの数に上限を設けていないからだ。 上限がないと、クライアントの数に応じてどんどんスレッドが増え続ける。 結果として、メモリを消費すると共に CPU が非効率な使われ方をしてしまう。

スレッドが多すぎるとまずいという問題点が分かったところで、次はスレッドを生成する数に上限を設けてみよう。 具体的には、あらかじめスレッドを既定数だけ生成して、それらに仕事を割り振る形にする。 この手法は、一般にスレッドプールと呼ばれている。 スレッドプールの中にいる各スレッドは、ワーカースレッドと呼ばれる。

次のサンプルコードはスレッドプールを使った実装になっている。 生成されたワーカーがサーバソケットからの接続を奪い合う形になる。 これなら、あらかじめ決まった数を超えるスレッドは生成されないので、前述したような問題は発生しない。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import time
import socket
import threading


def worker_thread(serversocket):
    """クライアントとの接続を処理するハンドラ"""
    while True:
        # クライアントからの接続を待ち受ける (接続されるまでブロックする)
        # ワーカスレッド同士でクライアントからの接続を奪い合う
        clientsocket, (client_address, client_port) = serversocket.accept()
        print('New client: {0}:{1}'.format(client_address, client_port))

        while True:
            try:
                message = clientsocket.recv(1024)
                print('Recv: {0} from {1}:{2}'.format(message,
                                                      client_address,
                                                      client_port))
            except OSError:
                break

            if len(message) == 0:
                break

            sent_message = message
            while True:
                sent_len = clientsocket.send(sent_message)
                if sent_len == len(sent_message):
                    break
                sent_message = sent_message[sent_len:]
            print('Send: {0} to {1}:{2}'.format(message,
                                                client_address,
                                                client_port))

        clientsocket.close()
        print('Bye-Bye: {0}:{1}'.format(client_address, client_port))


def main():
    serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)

    host = 'localhost'
    port = 37564
    serversocket.bind((host, port))

    serversocket.listen(128)

    # サーバソケットを渡してワーカースレッドを起動する
    NUMBER_OF_THREADS = 10
    for _ in range(NUMBER_OF_THREADS):
        thread = threading.Thread(target=worker_thread, args=(serversocket, ))
        thread.daemon = True
        thread.start()

    while True:
        # メインスレッドは遊ばせておく (ハンドラを処理させても構わない)
        time.sleep(1)


if __name__ == '__main__':
    main()

ただし、上記にも注意点がある。 それは、あらかじめプールしたスレッド数を超えてクライアントをさばくことができない、という点だ。 プール数を超えた接続があったときは、他のクライアントとの接続が切れるまで、ソケットは処理されないままキューに積まれてしまう。

実行結果については、先ほどと変わらないので省略する。

ちなみに、蛇足だけど Mac OS X に関してはプロセスごとに生成できるスレッド数があらかじめ制限されているようだ。 例えば、次のようなサンプルコードを用意して、たくさんのスレッドを起動してみる。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import threading
import time


def loop():
    """各スレッドは特に何もしない"""
    while True:
        time.sleep(1)


def main():
    # ネイティブスレッドをたくさん起動してみる
    for _ in range(10000):
        t = threading.Thread(target=loop)
        t.daemon = True
        t.start()
        # 動作中のスレッド数を出力する
        print(threading.active_count())


if __name__ == '__main__':
    main()

上記を実行してみよう。 すると 2049 個目のスレッドを起動するところで例外になった。

$ python toomanythreads.py
...(省略)...
2046
2047
2048
Traceback (most recent call last):
  File "toomanythreads.py", line 25, in <module>
    main()
  File "toomanythreads.py", line 19, in main
    t.start()
  File "/Users/amedama/.pyenv/versions/3.6.1/lib/python3.6/threading.py", line 846, in start
    _start_new_thread(self._bootstrap, ())
RuntimeError: can't start new thread

このリミットは、どうやら次のカーネルパラメータでかかっているらしい。

$ sysctl -n kern.num_taskthreads
2048

Mac OS X においては、スレッドの生成数に上限を設けないと、メモリの枯渇などを待つことなくサーバが突然死することになる。

マルチプロセス (プロセスプール)

先ほどの例では、スレッドプールを使うことで同時に処理できるクライアントの数を増やしつつ、リソースの消費を抑えることができた。 しかしながら、実はここまでの例では、パフォーマンスを求める上で、まだ使い切れていないリソースが残っている。 それは、複数の CPU コアだ。

実は Python の標準的な処理系である CPython には、とある制限が存在している。 それは、一つのプロセスで同時に実行できるスレッドの数が一つだけ、というもの。 一般的に、これはグローバルインタプリタロック (Global Interpreter Lock, GIL) と呼ばれている。 この制限は、Python/C API で書かれた拡張モジュールを Python から扱いやすくするために存在する。

この GIL がある処理系では、CPU に複数の論理コアがあったとしても、同時に使われるのが一つだけに制限されてしまう。 つまり、先ほどの例では、マルチスレッドにしても実際に使われている CPU 論理コアは同時に一つだけだった。 ようするに、複数のスレッドを OS が一つの CPU 論理コアの上で切り替え (コンテキストスイッチ) ながら動作する。

ちなみに、コンピュータの処理には、大きく分けて入出力 (I/O) が主体になるものと計算 (CPU) が主体になるものがある。 CPU が主体となるのは、例えば科学計算のようなもの。 それに対して、今回の例であるエコーサーバのようなプログラムは、CPU の処理がほとんどない。 処理時間のほとんどを I/O の待ちに使っていることから、入出力が主体のプログラムといえる。

つまり、今回取り扱うエコーサーバは CPU の処理能力がボトルネックになりにくい。 ようするに、あえて CPU の能力を最大限引き出すようなコードにする必然性は薄い。 しかしながら、アーキテクチャパターンの紹介という意味では重要だと思う。 なので、その方法についても記述しておこう。

その方法というのは、具体的にはプログラムを複数のプロセスで動かす。 前述した通り GIL はプロセスあたりの同時実行スレッド数を一つに制限するというものだった。 なので、プロセスを複数立ち上げてしまえば、同時実行スレッド数をプログラム全体で見たときに増やすことができる。

次のサンプルコードでは、スレッドの代わりにプロセスを複数起動 (マルチプロセス) している。 Python でマルチプロセスを扱う方法としては、例えば標準ライブラリの multiprocessing モジュールがある。 起動するプロセスの数は CPU の論理コア数と同じにした。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import time
import socket
import multiprocessing


def worker_process(serversocket):
    """クライアントとの接続を処理するハンドラ"""
    while True:
        # クライアントからの接続を待ち受ける (接続されるまでブロックする)
        # ワーカープロセス同士でクライアントからの接続を奪い合う
        clientsocket, (client_address, client_port) = serversocket.accept()
        print('New client: {0}:{1}'.format(client_address, client_port))

        while True:
            try:
                message = clientsocket.recv(1024)
                print('Recv: {0} from {1}:{2}'.format(message,
                                                      client_address,
                                                      client_port))
            except OSError:
                break

            if len(message) == 0:
                break

            sent_message = message
            while True:
                sent_len = clientsocket.send(sent_message)
                if sent_len == len(sent_message):
                    break
                sent_message = sent_message[sent_len:]
            print('Send: {0} to {1}:{2}'.format(message,
                                                client_address,
                                                client_port))

        clientsocket.close()
        print('Bye-Bye: {0}:{1}'.format(client_address, client_port))


def main():
    serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)

    host = 'localhost'
    port = 37564
    serversocket.bind((host, port))

    serversocket.listen(128)

    # プロセス数は CPU のコア数前後に合わせると良い
    NUMBER_OF_PROCESS = multiprocessing.cpu_count()
    # サーバソケットを渡してワーカープロセスを起動する
    for _ in range(NUMBER_OF_PROCESS):
        process = multiprocessing.Process(target=worker_process,
                                          args=(serversocket, ))
        # デーモンプロセスにする (親プロセスが死んだら子も道連れに死ぬ)
        process.daemon = True
        # プロセスを起動する
        process.start()

    while True:
        time.sleep(1)


if __name__ == '__main__':
    main()

マルチプロセスを使うときの注意点についても見ていこう。 これは、マルチスレッドの場合とほとんど変わらない。 つまり、プロセスを作るにもコンテキストが必要であり、コンテキストスイッチが起こるということだ。 そのため、同時に起動するプロセス数は制限してやる必要がある。 しかも、必要なリソースの量はスレッドに比べるとずっと多い。 そのため、一般的には起動するプロセス数は CPU の論理コアの数前後が良いとされている。

また、マルチプロセス固有の問題としては、プロセス間での値の共有が挙げられる。 マルチスレッドであれば、同一プロセス内でメモリ空間を共有していた。 なので、例えばグローバル変数の値をスレッド間で情報を共有する手段にもできた。 それに対し、マルチプロセスではプロセス同士でメモリ空間は共有していない。 そのため、別の何らかの IPC を使って情報をやり取りしなければいけない。

尚、繰り返しになるけどマルチプロセスにする必要があるのは、あくまで GIL があることに由来している。 もし、これがない処理系やプログラミング言語を使うなら、単にマルチスレッドにするだけで大丈夫。 ちゃんと CPU のコアを使い切ってくれるはず。

マルチプロセス・マルチスレッド

先ほどの例では、プロセスを複数立ち上げることで CPU の能力を使い切れるようにした。 ただし、マルチプロセスではあるものの、それぞれのプロセスでは一つのスレッドしか動かしていなかった。 そこで、次は各プロセスの中をマルチスレッドにしてみよう。 これなら、マルチプロセスかつマルチスレッドになって CPU と I/O の両方を上手く使い切れるはず。

次のサンプルコードでは、各ワーカープロセスの中でさらにスレッドプールを動かしている。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import time
import socket
import multiprocessing
import threading


def worker_thread(serversocket):
    """クライアントとの接続を処理するハンドラ (スレッド)"""
    while True:
        # クライアントからの接続を待ち受ける (接続されるまでブロックする)
        # ワーカースレッド同士でクライアントからの接続を奪い合う
        clientsocket, (client_address, client_port) = serversocket.accept()
        print('New client: {0}:{1}'.format(client_address, client_port))

        while True:
            try:
                message = clientsocket.recv(1024)
                print('Recv: {0} from {1}:{2}'.format(message,
                                                      client_address,
                                                      client_port))
            except OSError:
                break

            if len(message) == 0:
                break

            sent_message = message
            while True:
                sent_len = clientsocket.send(sent_message)
                if sent_len == len(sent_message):
                    break
                sent_message = sent_message[sent_len:]
            print('Send: {0} to {1}:{2}'.format(message,
                                                client_address,
                                                client_port))

        clientsocket.close()
        print('Bye-Bye: {0}:{1}'.format(client_address, client_port))


def worker_process(serversocket):
    """クライアントとの接続を受け付けるハンドラ (プロセス)"""

    # サーバソケットを渡してワーカースレッドを起動する
    NUMBER_OF_THREADS = 10
    for _ in range(NUMBER_OF_THREADS):
        thread = threading.Thread(target=worker_thread, args=(serversocket, ))
        thread.start()
        # ここではワーカーをデーモンスレッドにする必要はない (死ぬときはプロセスごと逝くので)

    while True:
        # ワーカープロセスのメインスレッドは遊ばせておく
        time.sleep(1)


def main():
    serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)

    host = 'localhost'
    port = 37564
    serversocket.bind((host, port))

    serversocket.listen(128)

    NUMBER_OF_PROCESSES = multiprocessing.cpu_count()
    for _ in range(NUMBER_OF_PROCESSES):
        process = multiprocessing.Process(target=worker_process,
                                          args=(serversocket, ))
        process.daemon = True
        process.start()

    while True:
        time.sleep(1)


if __name__ == '__main__':
    main()

実行結果については、これまで変わらないので省略する。

ひとまず、ソケットをブロッキングで扱う場合のアーキテクチャパターンについては、これでおわり。

ソケットをノンブロッキングで扱う場合

続いては、ソケットをノンブロッキングで扱う場合について見ていこう。 前述した通り、ソケットをノンブロッキングで扱うと、読み書きなどを指示してもブロックが起きない。 その代わり、もし読み書きの準備ができていないときはその旨がエラーで返ってくる。

とりあえずノンブロッキングにしてみよう

最初に、ノンブロッキングなソケットをブロッキングっぽく扱ったときの挙動を確認しておこう。 具体的に、どんなことが起こるのだろうか?

次のサンプルコードは、最初に示したシングルスレッドのサーバに一行だけ手を加えている。 それは、サーバソケットを setblocking() メソッドでノンブロッキングモードにしているところだ。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import socket


def main():
    serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    # ソケットをノンブロッキングモードにする
    serversocket.setblocking(False)

    serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)

    host = 'localhost'
    port = 37564
    serversocket.bind((host, port))

    serversocket.listen(128)

    while True:
        clientsocket, (client_address, client_port) = serversocket.accept()
        print('New client: {0}:{1}'.format(client_address, client_port))

        while True:
            try:
                message = clientsocket.recv(1024)
                print('Recv: {}'.format(message))
            except OSError:
                break

            if len(message) == 0:
                break

            sent_message = message
            while True:
                sent_len = clientsocket.send(sent_message)
                if sent_len == len(sent_message):
                    break
                sent_message = sent_message[sent_len:]
            print('Send: {}'.format(message))

        clientsocket.close()
        print('Bye-Bye: {0}:{1}'.format(client_address, client_port))


if __name__ == '__main__':
    main()

上記を実行してみよう。 すると、すぐに例外が出て終了してしまう。

$ python nonblocking.py
Traceback (most recent call last):
  File "nonblocking.py", line 48, in <module>
    main()
  File "nonblocking.py", line 22, in main
    clientsocket, (client_address, client_port) = serversocket.accept()
  File "/Users/amedama/.pyenv/versions/3.6.1/lib/python3.6/socket.py", line 205, in accept
    fd, addr = self._accept()
BlockingIOError: [Errno 35] Resource temporarily unavailable

上記の BlockingIOError という例外は、まだ準備が整っていないにも関わらず指示が出されたときに上がる。 今回の場合だと、クライアントからの接続が到着していないのに accept() メソッドを呼び出している。 ブロッキングモードのソケットなら、そのまま到着するまで待ってくれていた。 それに対し、ノンブロッキングモードでは呼び出した時点で到着していないなら即座に例外となってしまう。 正に、これがブロッキングとノンブロッキングの挙動の違い。

準備が整うまで待つ (ビジーループ)

先ほどの例で分かるように、ソケットをノンブロッキングで使うとブロッキングとは使い勝手が異なっている。 具体的には、ソケットの準備が整うのを勝手に待ってくれるわけではないので、自分で意図的に待たなければいけない。

では、どのようにすれば待つことができるだろうか。 一つのやり方としては、エラーが出なくなるまで定期的に実行してみる方法が考えられる。 この、何度も自分から試しに行くやり方はポーリングと呼ばれる。 その中でも、それぞれの試行間隔を全く空けないものはビジーループという。

次のサンプルコードではノンブロッキングなソケットをビジーループで待ちながら処理している。 ただし、あらかじめ言っておくと、このやり方は間違っている。 ソケットをノンブロッキングで扱うとき、こんなソースコードは書いちゃいけない。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import socket


def main():
    serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    serversocket.setblocking(False)

    serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)

    host = 'localhost'
    port = 37564
    serversocket.bind((host, port))

    serversocket.listen(128)

    while True:
        try:
            clientsocket, (client_address, client_port) = serversocket.accept()
        except (BlockingIOError, socket.error):
            # まだソケットの準備が整っていない
            continue

        print('New client: {0}:{1}'.format(client_address, client_port))

        while True:
            try:
                message = clientsocket.recv(1024)
                print('Recv: {}'.format(message))
            except (BlockingIOError,  socket.error):
                # まだソケットの準備が整っていない
                continue
            except OSError:
                break

            if len(message) == 0:
                break

            sent_message = message
            while True:
                try:
                    sent_len = clientsocket.send(sent_message)
                except (BlockingIOError,  socket.error):
                    # まだソケットの準備が整っていない
                    continue
                if sent_len == len(sent_message):
                    break
                sent_message = sent_message[sent_len:]
            print('Send: {}'.format(message))

        clientsocket.close()
        print('Bye-Bye: {0}:{1}'.format(client_address, client_port))


if __name__ == '__main__':
    main()

上記のサンプルコードは一応動作するものの、複数のクライアントを処理することができない。 それに、ビジーループを使っているとプロセスの CPU 使用率が 100% になってしまう。 繰り返しになるけど、ソケットをノンブロッキングで扱うとき、こんなソースコードは書いちゃだめ。

準備が整うまで待つ (イベントループ)

ビジーループでは色々と難しいことが分かったところで、次は実用的に待つ方法を見ていこう。 これには、イベントループや I/O 多重化と呼ばれる手法というかシステムコールを用いる。 システムコールというのは OS のカーネルに備わっている API のことだ。 ユーザランドのプログラムは、このシステムコールを呼び出すことで OS の機能が利用できる。

システムコールの中には、ソケットの状態を監視して、変更されたときにそれを通知してくれるものがある。 より正確には、監視できるものはファイルやソケットに汎用的に割り当てられるファイルディスクリプタだ。

イベントループにはいくつかの種類があるものの、ここでは古典的な select(2) を使うやり方を見ていく。 次のサンプルコードは、エコーサーバを select(2) システムコールで実装したもの。 ただし、先に断っておくと、これは実装している機能の割にコード量が多いし、逐次的でないから読みにくいと思う。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import socket
import select


# 読み取りが可能になるまで待っているソケットと、可能になったときに呼び出されるハンドラ・引数の対応を持つ
read_waiters = {}
# 書き込みが可能になるまで待っているソケットと、可能になったときに呼び出されるハンドラ・引数の対応を持つ
write_waiters = {}
# 接続してきたクライアントとの接続情報を格納する
connections = {}


def accept_handler(serversocket):
    """サーバソケットが読み取り可能になったとき呼ばれるハンドラ"""
    # 準備ができているので、すぐに accept() できる
    clientsocket, (client_address, client_port) = serversocket.accept()

    # クライアントソケットもノンブロックモードにする
    clientsocket.setblocking(False)

    # 接続してきたクライアントの情報を出力する
    # ただし、厳密に言えば print() もブロッキング I/O なので避けるべき
    print('New client: {0}:{1}'.format(client_address, client_port))

    # ひとまずクライアントの一覧に入れておく
    connections[clientsocket.fileno()] = (clientsocket,
                                          client_address,
                                          client_port)

    # 次はクライアントのソケットが読み取り可能になるまで待つ
    read_waiters[clientsocket.fileno()] = (recv_handler,
                                           (clientsocket.fileno(), ))

    # 次のクライアントからの接続を待ち受ける
    read_waiters[serversocket.fileno()] = (accept_handler, (serversocket, ))


def recv_handler(fileno):
    """クライアントソケットが読み取り可能になったとき呼ばれるハンドラ"""
    def terminate():
        """クライアントとの接続が切れたときの後始末"""
        # クライアント一覧から取り除く
        del connections[clientsocket.fileno()]
        # ソケットを閉じる
        clientsocket.close()
        print('Bye-Bye: {0}:{1}'.format(client_address, client_port))

    # クライアントとの接続情報を取り出す
    clientsocket, client_address, client_port = connections[fileno]

    try:
        # 準備ができているので、すぐに recv() できる
        message = clientsocket.recv(1024)
    except OSError:
        terminate()
        return

    if len(message) == 0:
        terminate()
        return

    print('Recv: {0} to {1}:{2}'.format(message,
                                        client_address,
                                        client_port))

    # 次はクライアントのソケットが書き込み可能になるまで待つ
    write_waiters[fileno] = (send_handler, (fileno, message))


def send_handler(fileno, message):
    """クライアントソケットが書き込み可能になったとき呼ばれるハンドラ"""
    # クライアントとの接続情報を取り出す
    clientsocket, client_address, client_port = connections[fileno]

    # 準備ができているので、すぐに send() できる
    sent_len = clientsocket.send(message)
    print('Send: {0} to {1}:{2}'.format(message[:sent_len],
                                        client_address,
                                        client_port))

    if sent_len == len(message):
        # 全て送ることができたら、次はまたソケットが読み取れるようになるのを待つ
        read_waiters[clientsocket.fileno()] = (recv_handler,
                                               (clientsocket.fileno(), ))
    else:
        # 送り残している内容があったら、再度ソケットが書き込み可能になるまで待つ
        write_waiters[fileno] = (send_handler,
                                 (fileno, message[sent_len:]))


def main():
    serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    # ソケットをノンブロックモードにする
    serversocket.setblocking(False)

    serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)

    host = 'localhost'
    port = 37564
    serversocket.bind((host, port))

    serversocket.listen(128)

    # クライアントからの接続がくるまで待つ
    read_waiters[serversocket.fileno()] = (accept_handler, (serversocket, ))

    while True:
        # ソケットが読み取り・書き込み可能になるまで待つ
        rlist, wlist, _ = select.select(read_waiters.keys(),
                                        write_waiters.keys(),
                                        [],
                                        60)

        # 読み取り可能になったソケット (のファイル番号) の一覧
        for r_fileno in rlist:
            # 読み取り可能になったときに呼んでほしいハンドラを取り出す
            handler, args = read_waiters.pop(r_fileno)
            # ハンドラを実行する
            handler(*args)

        # 書き込み可能になったソケット (のファイル番号の一覧)
        for w_fileno in wlist:
            # 書き込み可能になったときに呼んでほしいハンドラを取り出す
            handler, args = write_waiters.pop(w_fileno)
            # ハンドラを実行する
            handler(*args)


if __name__ == '__main__':
    main()

Python では select(2) システムコールの薄いラッパとして select モジュールが使える。 このモジュールが提供する select() 関数には、ファイルディスクリプタの入ったリストを渡す。

ファイルディスクリプタというのは、名前だけ聞くと難しそうだけどただの整数に過ぎない。 これは、各ソケットやファイルなどを使うときに OS が割り当てた一意な整数を指している。 ようするに 10 とか 20 とかいう数字が、何らかのソケットやファイルなどを表す。 ソケットに割り当てられたファイルディスクリプタは fileno() メソッドで得られる。

select() 関数には、読み込みや書き込みの準備ができたら通知してほしいファイルディスクリプタを渡す。 そして select() 関数を呼び出すと、そこでブロックした後に、準備ができたファイルディスクリプタが返される。 返ってきたファイルディスクリプタは、既に読み書きができるようになっているので指示を出しても例外にはならない。

先ほどのサンプルコードでは、そのようにして準備ができたものに対して読み書きをしている。 ビジーループと比べると CPU を使い切ることもなく、複数のクライアントを処理できる。 また、大きなポイントとしてはシングルスレッドにも関わらず、複数のクライアントを処理できているところだ。 これはソケットをブロッキングで使っていたときとの大きな違いだろう。

ちなみに、今回使った select(2) システムコールにはパフォーマンス上の問題が知られている。 そのため、実用的な用途で使われることはそこまで多くない。 代わりに、BSD 系なら kqueue(2)、Linux であれば epoll(2) が用いられる。 ただし、select(2) なら大抵のプラットフォームで使えるので、それらに比べると移植性が高いというメリットはある。

また「ソケットやファイルなど」と前述した通り、実はブロッキング・ノンブロッキングという概念はソケットに限った話ではない。 ファイルやデバイスについてもノンブロッキングで扱うことはできる。 そして、これはノンブロッキングなソケットプログラミングをする上で重要な意味を持ってくる。 詳細は後述するものの、これはノンブロッキングとブロッキングを同じスレッドで混ぜて使うと問題が発生する、というもの。

尚、前述した通り先ほどのサンプルコードはシングルプロセス・シングルスレッドで動作している。 そのため、複数の CPU コアを使い切ることはできない。 使い切れるようにするときは、マルチプロセスにする必要がある。 もちろん、これは GIL の制約のためにプロセスを複数立ち上げる必要があるに過ぎない。 別の処理系やプログラミング言語であれば、単にマルチスレッドにするだけで良い。 いずれの場合でも、それぞれのスレッドごとにイベントループを用意する。

ノンブロッキング I/O をラップした API やライブラリを使う

先ほどの例ではイベントループのシステムコールを使ってノンブロッキングなソケットを処理してみた。 とはいえ、実際にシステムコールを直接使ってソケットプログラミングする機会は、あまりないと思う。 なぜなら、先ほどのサンプルコードを見て分かる通り、それらの API はそのままでは扱いにくい上にコード量も増えてしまうため。

実際には、イベントループをラップしたライブラリを使ってプログラミングすることになると思う。 どんなライブラリがあるかはプログラミング言語ごとに異なる。 例えば C 言語なら libev が有名だと思うし Python なら Twisted などがある。 また、Python に関しては 3.4 から標準ライブラリに asyncio というモジュールが追加された。 次は、この asyncio を使ってみることにしよう。

Python の asyncio には色んなレイヤーの API が用意されている。 それこそ、先ほどのシステムコールを直接使うのと大差ないようなコードも書ける。 しかし、それだとライブラリを使う意味がないので、もうちょっと抽象度の高いものを使ってみた。 次のサンプルコードでは asyncio を使ってエコーサーバを実装している。 コードを見て分かる通り、先ほどと比べるとだいぶコード量が減って読みやすくなっている。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import asyncio


class EchoServer(asyncio.Protocol):

    def connection_made(self, transport):
        """クライアントからの接続があったときに呼ばれるイベントハンドラ"""
        # 接続をインスタンス変数として保存する
        self.transport = transport

        # 接続元の情報を出力する
        client_address, client_port = self.transport.get_extra_info('peername')
        print('New client: {0}:{1}'.format(client_address, client_port))

    def data_received(self, data):
        """クライアントからデータを受信したときに呼ばれるイベントハンドラ"""
        # 受信した内容を出力する
        client_address, client_port = self.transport.get_extra_info('peername')
        print('Recv: {0} to {1}:{2}'.format(data,
                                            client_address,
                                            client_port))

        # 受信したのと同じ内容を返信する
        self.transport.write(data)
        print('Send: {0} to {1}:{2}'.format(data,
                                            client_address,
                                            client_port))

    def connection_lost(self, exc):
        """クライアントとの接続が切れたときに呼ばれるイベントハンドラ"""
        # 接続が切れたら後始末をする
        client_address, client_port = self.transport.get_extra_info('peername')
        print('Bye-Bye: {0}:{1}'.format(client_address, client_port))
        self.transport.close()


def main():
    host = 'localhost'
    port = 37564

    # イベントループを用意する
    ev_loop = asyncio.get_event_loop()

    # 指定したアドレスとポートでサーバを作る
    factory = ev_loop.create_server(EchoServer, host, port)
    # サーバを起動する
    server = ev_loop.run_until_complete(factory)

    try:
        # イベントループを起動する
        ev_loop.run_forever()
    finally:
        # 後始末
        server.close()
        ev_loop.run_until_complete(server.wait_closed())
        ev_loop.close()


if __name__ == '__main__':
    main()

注目すべきは、もはやソースコードの中に socket モジュールが登場していないところ。 それらは Protocol や Transport といった抽象的なオブジェクトに取って代わられている。

では、本当に内部でイベントループのシステムコールが使われているのかを調べてみよう。 まずは上記のサンプルコードを実行して、エコーサーバを起動する。

$ python asyncioserver.py

続いて、別のターミナルを開いたら上記エコーサーバが動いているプロセスの ID を調べる。

$ ps auxww | grep [a]syncioserver
amedama        31018   0.0  0.2  2430616  17344 s000  S+    7:58PM   0:00.16 python asyncioserver.py

そして、プロセスで発行されるシステムコールをトレースする dtruss コマンドを仕掛ける。

$ sudo dtruss -p 31018

準備ができたらクライアントを接続する。

$ nc localhost 37564

すると、次のように kevent(2) システムコールが発行されていることが分かる。 kevent(2) システムコールは kqueue(2) と共に用いるイベントループのためのシステムコール。

$ sudo dtruss -p 31018
SYSCALL(args)            = return
...
kevent(0x3, 0x0, 0x0)            = 0 0
getsockname(0xA, 0x7FFF50F55B00, 0x7FFF50F55AFC)                 = 0 0
setsockopt(0xA, 0x6, 0x1)                = 0 0
kevent(0x3, 0x0, 0x0)            = 0 0
write(0x1, "New client: 127.0.0.1:51822\n\0", 0x1C)              = 28 0
kevent(0x3, 0x10F8FB6F0, 0x1)            = 0 0
kevent(0x3, 0x0, 0x0)            = 0 0

どうやら、ちゃんと内部がノンブロッキングな世界になっていることが確認できた。 しかも、プラットフォームに応じたパフォーマンスに優れるイベントループをちゃんと使ってくれている。

ノンブロッキングとブロッキングは混ぜるな危険

ちなみに、ノンブロッキングなソケットプログラミングをする上では重要なポイントが一つある。 それは、ノンブロッキングなソケットを扱うスレッドで、ブロッキングな操作をしてはいけない、という点。 もちろん、前述した通りブロッキング・ノンブロッキングという概念はソケットに限った話ではない。 つまり、言い換えるとノンブロッキングな I/O とブロッキングな I/O は同じスレッドで混ぜてはいけない。

二つ前のセクションで登場した select システムコールを使ったサンプルコードを思い出してほしい。 あのサンプルコードでは、シングルスレッドで複数のクライアントをさばいていた。 では、もしその一つしかないスレッドが何処かでブロックしたら、何が起こるだろうか? これは、そのスレッドでさばいている全ての処理が、そこで停止してしまうことを意味する。 これは、ノンブロッキングな I/O を扱う上で登場する代表的な問題の一つ。

どのようなことが起こるかを実際に確かめてみよう。 次のサンプルコードでは、データを受信した際に time.sleep() 関数を使っている。 これには、実行したスレッドを指定した時間だけブロックさせる効果がある。 正に、ノンブロッキングなスレッドへのブロッキングな操作の混入といえる。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import asyncio
import time


class EchoServer(asyncio.Protocol):

    def connection_made(self, transport):
        self.transport = transport

        client_address, client_port = self.transport.get_extra_info('peername')
        print('New client: {0}:{1}'.format(client_address, client_port))

    def data_received(self, data):
        client_address, client_port = self.transport.get_extra_info('peername')
        print('Recv: {0} to {1}:{2}'.format(data,
                                            client_address,
                                            client_port))

        # 何らかの処理で、イベントループのスレッドがブロックしてしまった!
        print('Go to sleep...')
        time.sleep(20)

        self.transport.write(data)
        print('Send: {0} to {1}:{2}'.format(data,
                                            client_address,
                                            client_port))

    def connection_lost(self, exc):
        client_address, client_port = self.transport.get_extra_info('peername')
        print('Bye-Bye: {0}:{1}'.format(client_address, client_port))
        self.transport.close()


def main():
    host = 'localhost'
    port = 37564

    ev_loop = asyncio.get_event_loop()

    factory = ev_loop.create_server(EchoServer, host, port)
    server = ev_loop.run_until_complete(factory)

    try:
        ev_loop.run_forever()
    finally:
        server.close()
        ev_loop.run_until_complete(server.wait_closed())
        ev_loop.close()


if __name__ == '__main__':
    main()

上記のサンプルコードを実行してエコーサーバを起動しよう。

$ python asyncblock.py

続いて別のターミナルから nc コマンドでサーバに接続したら適当な文字列を入力する。

$ nc localhost 37564
hogehoge

これでイベントループを回しているスレッドはブロックを起こした。

$ python asyncblock.py
New client: 127.0.0.1:51883
Recv: b'hogehoge\n' to 127.0.0.1:51883
Go to sleep...

すかさず別のターミナルから nc でクライアントを追加してみよう。

$ nc localhost 37564

すると、今度はサーバに新しいクライアントが追加された旨は表示されない。 エコーサーバ全体の処理が、一箇所で停止してしまっているからだ。

$ python asyncblock.py
New client: 127.0.0.1:51883
Recv: b'hogehoge\n' to 127.0.0.1:51883
Go to sleep...

もうしばらく待つと、スレッドのブロックが解除されて新しいクライアントの接続が受理される。

python asyncblock.py
New client: 127.0.0.1:51883
Recv: b'hogehoge\n' to 127.0.0.1:51883
Go to sleep...
Send: b'hogehoge\n' to 127.0.0.1:51883
New client: 127.0.0.1:51884

このように、ノンブロッキング I/O を扱うスレッドにブロッキング I/O のコードが混入すると、全てがそこで停止してしまう。

そして、真にこの問題が恐ろしいのは、混入に気づきにくい点かもしれない。 先ほどのサンプルコードは極端な例なので、使ってみるだけでも明確に変化を知覚できた。 しかしながら、実際にはブロッキング I/O の処理は人間にとって一瞬なので気づくことは難しいかもしれない。 にも関わらず、そのタイミングで一連の処理が全て停止していることに間違いはない。 結果として、パフォーマンスの低下をもたらす。

また、世の中のほとんどのライブラリはブロッキング I/O を使って実装されている。 例えば、外部の WebAPI を叩こうとそのまま requests でも使おうものなら、それだけでアウトだ。 それに、HTTP のような分かりやすい I/O 以外にもキューのような基本的な部品であっても操作をブロックしたりする。

つまり、新たに何かを使おうとしたら、それにブロックする操作が混入していないかをあらかじめ調べる必要がある。 さらに、ブロックする操作が含まれると分かったら、それをブロックしないようにする方法を模索しなきゃいけない。 以上のように、イベントループを中心に据えた非同期なフレームワークというのは、一般的な認識よりもずっと扱いが難しいと思う。

ブロッキング I/O が混入する問題へのアプローチについて

ノンブロッキング I/O を扱うスレッドにブロッキング I/O が混ざり込む問題に対するアプローチはいくつかある。 もちろん、混入しないように人間が頑張ってコードを見張る、というのは最も基本的なやり方の一つ。

それ以外には、プログラミング言語のレベルでブロッキング I/O を排除してしまうという選択肢もある。 これは例えば JavaScript (Node.js) が採用している。 Golang も、ネットワーク部分に関してはノンブロッキング I/O しか用意していないらしい。 初めからブロッキング I/O の操作が存在していないなら、そもそも混入することはない。

それ以外には、モンキーパッチを当てるというアプローチもある。 つまり、ブロッキング I/O を使うコードを、全てノンブロッキング I/O を使うように書き換えてしまう。 Python であれば、例えば EventletGevent といったサードパーティー製ライブラリがこれにあたる。

試しに Eventlet を使った例を見てみよう。 まずは Python のパッケージマネージャである pip を使って Eventlet をインストールしておく。

$ pip install eventlet

それでは Eventlet の魔法をお見せしよう。 次のサンプルコードは、最初に示したマルチスレッドの例に、たった二行だけコードを追加している。 その冒頭に追加した二行こそ、まさにモンキーパッチを当てるためのコードになっている。 たったこれだけで、ブロッキングだった世界がノンブロッキングな世界に書き換わってしまう。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

# 標準ライブラリにモンキーパッチを当てる
# ブロッキング I/O を使った操作が裏側で全てノンブロッキング I/O を使うように書き換えられる
import eventlet
eventlet.monkey_patch()

import socket
import threading


def client_handler(clientsocket, client_address, client_port):
    """クライアントとの接続を処理するハンドラ"""
    while True:
        try:
            message = clientsocket.recv(1024)
            print('Recv: {0} from {1}:{2}'.format(message,
                                                  client_address,
                                                  client_port))
        except OSError:
            break

        if len(message) == 0:
            break

        sent_message = message
        while True:
            sent_len = clientsocket.send(sent_message)
            if sent_len == len(sent_message):
                break
            sent_message = sent_message[sent_len:]
        print('Send: {0} to {1}:{2}'.format(message,
                                            client_address,
                                            client_port))

    clientsocket.close()
    print('Bye-Bye: {0}:{1}'.format(client_address, client_port))


def main():
    serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)

    host = 'localhost'
    port = 37564
    serversocket.bind((host, port))

    serversocket.listen(128)

    while True:
        clientsocket, (client_address, client_port) = serversocket.accept()
        print('New client: {0}:{1}'.format(client_address, client_port))

        client_thread = threading.Thread(target=client_handler,
                                         args=(clientsocket,
                                               client_address,
                                               client_port))
        client_thread.daemon = True
        client_thread.start()


if __name__ == '__main__':
    main()

本当にイベントループが使われているのか確かめてみることにしよう。 まずは、上記のサンプルコードを実行する。

$ python eventletserver.py

続いて別のターミナルを開いて、上記で実行しているプロセス ID を調べる。

$ ps auxww | grep [e]ventletserver
amedama         7796   0.0  0.1  2426888  19488 s000  S+    8:44PM   0:00.17 python eventletserver.p

dtruss コマンドでプロセス内で発行されるシステムコールをトレースする。

$ sudo dtruss -p 7796

クライアントからサーバに接続してみよう。

$ nc localhost 37564

すると、次のように dtruss の実行結果に kevent システムコールが登場している。 本当に、モンキーパッチを当てるだけでノンブロッキング I/O を使うようになった。

$ sudo dtruss -p 7796
SYSCALL(args)            = return
kevent(0x4, 0x101256710, 0x1)            = 0 0
accept(0x3, 0x7FFF5F8A7750, 0x7FFF5F8A774C)              = 7 0
ioctl(0x7, 0x20006601, 0x0)              = 0 0
ioctl(0x7, 0x8004667E, 0x7FFF5F8A7A04)           = 0 0
ioctl(0x7, 0x8004667E, 0x7FFF5F8A74E4)           = 0 0
write(0x1, "New client: 127.0.0.1:54132\n\0", 0x1C)              = 28 0
recvfrom(0x7, 0x7FE86782BC20, 0x400)             = -1 Err#35
kevent(0x4, 0x101256710, 0x1)            = 0 0
kevent(0x4, 0x7FE866D11020, 0x0)                 = 0 0
accept(0x3, 0x7FFF5F8A7750, 0x7FFF5F8A774C)              = -1 Err#35
kevent(0x4, 0x101256710, 0x1)            = 0 0

注目すべきは、逐次的なプログラミングモデルを保ったまま、それが実現できているところだろう。 asyncio の例でも、データの読み書きなどは逐次的に書くことができたものの、基本はイベントドリブンだった。 しかし Eventlet のコードでは、完全にブロッキング I/O を使っているときと同じように書くことができている。

これが一体どのようにして実現されているかというと、主にグリーンスレッドの寄与が大きい。 Eventlet では、カーネルで実装されたネイティブスレッドの代わりにユーザランドで実装されたグリーンスレッドを用いる。 グリーンスレッドには、実装によってコルーチン、軽量プロセス、協調スレッドなど色々な呼び方がある。

カーネルで実装されたネイティブスレッドとの大きな違いは、コンテキストスイッチのタイミングがプログラムで制御できるところにある。 ネイティブスレッドのコンテキストスイッチはカーネルのスケジューラ次第なので、基本的にプログラムからは制御できない。 それに対し、グリーンスレッドでは実行中のスレッドが自発的に処理を手放さない限りコンテキストスイッチが起こらない。 つまり、I/O などの外的な要因がない限りグリーンスレッドは決定論的に動作することを意味している。

Eventlet では、モンキーパッチを使うと既存のスレッドやソケットがインターフェースはそのままに書き換えられる。 そして、本来ならブロックするコードに処理が到達したタイミングでコンテキストスイッチが起こるように変化する。 コンテキストスイッチする先は、読み書きの準備が整った I/O を処理しているグリーンスレッドだ。 これは、先ほどシステムコールをトレースした通り、イベントループを使って判断している。 そして、コンテキストスイッチした元のグリーンスレッドは、イベントループを使って処理中の I/O が読み書きができるようになるまで待たされる。 ちなみに Golang はプログラミング言語のレベルで上記を実現していて、それは goroutine と呼ばれている。

このアーキテクチャでは、逐次的なプログラミングモデルを保ったままノンブロッキング I/O を使った恩恵が受けられる。 また、グリーンスレッドは一般的にネイティブスレッドよりもコンテキストに必要なメモリのサイズが小さい。 つまり、同時に多くのクライアントをさばきやすい。

ただし、Eventlet のようなモンキーパッチを使ったアプローチには抵抗がある人も多いかもしれない。 実際のところ Eventlet にはクセが全くないとは言えないし、よく分からずに使うのはやめた方が良いと思う。 ただし、名誉のために言っておくと Eventlet は OpenStack のような巨大なプロジェクトでも使われている実績のあるライブラリだ。

ちなみに、モンキーパッチでは一つだけブロッキング I/O の混入を防げないところがある。 それは Python/C API を使って書かれた拡張モジュールだ。 コンパイル済みの拡張モジュールに対しては、個別に対応しない限り自動でモンキーパッチが効くことはない。 これは、典型的には Python/C API で書かれたデータベースドライバで問題になることが多い。

まとめ

今回はソケットプログラミングにおいて、どういったアーキテクチャが考えられるかについて見てきた。 まず、ソケットは大きく分けてブロッキングで使うかノンブロッキングで使うかという選択肢がある。

ブロッキングは、逐次的なプログラミングモデルで扱いやすいことから理解もしやすい。 ただし、複数のクライアントをさばくにはマルチスレッドやマルチプロセスにする必要がある。 それらは必要なコンテキストのサイズやスイッチのコストも大きいことから、スケーラビリティの面で問題となりやすい。

それに対し、ノンブロッキングはイベントドリブンなプログラミングモデルとなりやすいことから理解が難しい。 しかしながら、イベントループを使うことでシングルスレッドでも複数のクライアントを効率的にさばける。

また、ブロッキング・ノンブロッキングというのはソケットに限った概念ではない。 ファイルなども同じようにノンブロッキングで扱うことができる。

ノンブロッキングで I/O を扱うときの注意点としては、同じスレッドをブロックさせてはいけない、というところ。 言い換えると、イベントループを回しているスレッドにブロッキングな I/O のコードを混入させてはいけない。 もし混入するとパフォーマンス低下をもたらす。

ブロッキング I/O が混入する問題に対するアプローチは、言語や処理系、ライブラリによっていくつかある。 例えば JavaScript (Node.js) では、プログラミング言語自体にブロッキング I/O を扱う API がない。 それ以外だと、スクリプト言語ならモンキーパッチで動的に実装を書き換えてしまうというものもある。

参考文献

詳解UNIXプログラミング 第3版

詳解UNIXプログラミング 第3版

Python: KMeans 法を実装してみる

KMeans 法は、機械学習における教師なし学習のクラスタリングという問題を解くためのアルゴリズム。 教師なし学習というのは、事前に教師データというヒントが与えられないことを指している。 その上で、クラスタリングというのは未知のデータに対していくつかのまとまりを作る問題をいう。

今回取り扱う KMeans 法は、比較的単純なアルゴリズムにも関わらず広く使われているものらしい。 実際に書いてみても、基本的な実装であればたしかにとてもシンプルだった。 ただし、データの初期化をするところで一点考慮すべき内容があることにも気づいたので、それについても書く。 KMeans 法の具体的なアルゴリズムについてはサンプルコードと共に後述する。

今回使った環境は次の通り。

$ sw_vers 
ProductName:    Mac OS X
ProductVersion: 10.12.3
BuildVersion:   16D32
$ python --version
Python 3.5.3

依存パッケージをインストールする

あらかじめ、今回ソースコードで使う依存パッケージをインストールしておく。 グラフ描画ライブラリの matplotlib を Mac OS X で動かすには、pip でのインストール以外にもちょっとした設定が必要になる。

$ pip install scipy scikit-learn matplotlib
$ mkdir -p ~/.matplotlib
$ cat << 'EOF' > ~/.matplotlib/matplotlibrc
backend: TkAgg
EOF

まずは scikit-learn のお手本から

本来はお手本の例については後回しにしたいんだけど、今回は構成の都合で先に示しておく。 Python の機械学習系ライブラリである scikit-learn には KMeans 法の実装も用意されている。 自分で書いた実装をお披露目する前に、どんなものなのか見てもらいたい。

次のサンプルコードでは、ダミーのデータを生成した上でそれをクラスタリングしている。 scikit-learn にはデータセットを生成する機能も備わっている。 その中でも make_blobs() という関数は、いくつかのまとまりを持ったダミーデータを作ってくれる。 あとは、そのダミーデータを KMeans 法の実装である sklearn.cluster.KMeans で処理させている。 クラスタリングした処理結果は matplotlib を使って可視化した。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from matplotlib import pyplot as plt
from sklearn import datasets
from sklearn.cluster import KMeans


def main():
    # クラスタ数
    N_CLUSTERS = 5

    # Blob データを生成する
    dataset = datasets.make_blobs(centers=N_CLUSTERS)

    # 特徴データ
    features = dataset[0]
    # 正解ラベルは使わない
    # targets = dataset[1]

    # クラスタリングする
    cls = KMeans(n_clusters=N_CLUSTERS)
    pred = cls.fit_predict(features)

    # 各要素をラベルごとに色付けして表示する
    for i in range(N_CLUSTERS):
        labels = features[pred == i]
        plt.scatter(labels[:, 0], labels[:, 1])

    # クラスタのセントロイド (重心) を描く
    centers = cls.cluster_centers_
    plt.scatter(centers[:, 0], centers[:, 1], s=100,
                facecolors='none', edgecolors='black')

    plt.show()


if __name__ == '__main__':
    main()

上記のサンプルコードでは、クラスタ数として適当に 5 を選んだ。 ここで「クラスタ数を選ぶ」というのはポイントとなる。 KMeans 法では、あらかじめクラスタ数をハイパーパラメータとして指定するためだ。 つまり、データをいくつのまとまりとして捉えるかを人間が教えてやる必要がある。 クラスタリングの手法によっては、これをアルゴリズムがやってくれる場合もある。

それでは、上記のサンプルコードを実行してみよう。

$ python kmeans_scikit.py

すると、こんな感じのグラフが出力される。 それぞれの特徴ベクトルが所属するクラスタが、別々の色で表現されている。 また、それぞれのクラスタの真ん中らへんにある黒い円は、後述するセントロイドという概念を表している。 f:id:momijiame:20170319081628p:plain ちなみに、生成されるダミーデータは毎回異なるので、出力されるグラフも異なる。

自分で実装してみる

scikit-learn がクラスタリングしたお手本を見たところで、次は KMeans 法を自分で書いてみよう。

まず、KMeans 法ではセントロイド (重心) という概念が重要になる。 セントロイドというのは、文字通りクラスタの中心に置かれるものだ。 アルゴリズムの第一歩としては、このセントロイドを作ることになる。 セントロイドは、前述した通り各クラスタの中心に置かれる。 そのため、セントロイドはハイパーパラメータとして指定したクラスタ数だけ必要となる。 また、中心というのは、クラスタに属する特徴ベクトルの各次元でそれらの平均値にいる、ということ。

そして、データセットに含まれる特徴ベクトルは、必ず最寄りのセントロイドに属する。 ここでいう最寄りとは、特徴ベクトル同士のユークリッド距離が近いという意味だ。 属したセントロイドが、それぞれのクラスタを表している。

上記の基本的な概念を元に KMeans 法のアルゴリズムは次のようなステップに分けることができる

  • 最初のセントロイドを何らかの方法で決める
  • 特徴ベクトルを最寄りのセントロイドに所属させる
  • 各クラスタごとにセントロイドを再計算する
  • 上記 2, 3 番目の操作を繰り返す
  • もしイテレーション前後で所属するセントロイドが変化しなければ、そこで処理を終了する

ここで、一番最初のステップである「最初のセントロイドを何らかの方法で決める」のがポイントだった。 例えば、次のページではあらかじめ各特徴ベクトルをランダムにクラスタリングしてから、セントロイドを決めるやり方を取っている。

tech.nitoyon.com

上記のやり方であれば、次のようなサンプルコードになる。 ここでは KMeans というクラスで KMeans 法を実装している。 インターフェースは、先ほどお手本として提示した scikit-learn と揃えた。 そのため main() 関数は基本的に変わっておらず、見るべきは KMeans クラスだけとなっている。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from matplotlib import pyplot as plt
from sklearn import datasets
import numpy as np


class KMeans(object):
    """KMeans 法でクラスタリングするクラス"""

    def __init__(self, n_clusters=2, max_iter=300):
        """コンストラクタ

        Args:
            n_clusters (int): クラスタ数
            max_iter (int): 最大イテレーション数
        """
        self.n_clusters = n_clusters
        self.max_iter = max_iter

        self.cluster_centers_ = None

    def fit_predict(self, features):
        """クラスタリングを実施する

        Args:
            features (numpy.ndarray): ラベル付けするデータ

        Returns:
            numpy.ndarray: ラベルデータ
        """
        # 初期データは各要素に対してランダムにラベルをつける
        pred = np.random.randint(0, self.n_clusters, len(features))

        # クラスタリングをアップデートする
        for _ in range(self.max_iter):

            # 各クラスタごとにセントロイド (重心) を計算する
            self.cluster_centers_ = np.array([features[pred == i].mean(axis=0)
                                              for i in range(self.n_clusters)])

            # 各特徴ベクトルから最短距離となるセントロイドを基準に新しいラベルをつける
            new_pred = np.array([
                np.array([
                    self._euclidean_distance(p, centroid)
                    for centroid in self.cluster_centers_
                ]).argmin()
                for p in features
            ])

            if np.all(new_pred == pred):
                # 更新前と内容を比較して、もし同じなら終了
                break

            pred = new_pred

        return pred

    def _euclidean_distance(self, p0, p1):
        return np.sum((p0 - p1) ** 2)


def main():
    # クラスタ数
    N_CLUSTERS = 5

    # Blob データを生成する
    dataset = datasets.make_blobs(centers=N_CLUSTERS)

    # 特徴データ
    features = dataset[0]
    # 正解ラベルは使わない
    # targets = dataset[1]

    # クラスタリングする
    cls = KMeans(n_clusters=N_CLUSTERS)
    pred = cls.fit_predict(features)

    # 各要素をラベルごとに色付けして表示する
    for i in range(N_CLUSTERS):
        labels = features[pred == i]
        plt.scatter(labels[:, 0], labels[:, 1])

    centers = cls.cluster_centers_
    plt.scatter(centers[:, 0], centers[:, 1], s=100,
                facecolors='none', edgecolors='black')

    plt.show()


if __name__ == '__main__':
    main()

上記を実行してみよう。

$ python kmeans_random.py

実行すると、次のようなグラフが表示される。 f:id:momijiame:20170319153805p:plain 一見すると、上手くいっているようだ。

しかし、何度か実行すると、次のようなエラーが出ることがある。

$ python kmeans_random.py 
kmeans_random.py:41: RuntimeWarning: Mean of empty slice.
  for i in range(self.n_clusters)])
/Users/amedama/.virtualenvs/kmeans/lib/python3.5/site-packages/numpy/core/_methods.py:73: RuntimeWarning: invalid value encountered in true_divide
  ret, rcount, out=ret, casting='unsafe', subok=False)

表示されるグラフは、次のようなものになる。 f:id:momijiame:20170319153914p:plain 何やら、ぜんぜん上手いことクラスタリングできていない。

上記が起こる原因について調べたところ、セントロイドの初期化に問題があることが分かった。 初期化時に特徴ベクトルに対してランダムにラベルをつけるやり方では、一つの特徴ベクトルも属さないセントロイドが生じる恐れがあるためだ。

ランダムにラベルをつけるやり方では、セントロイドはおのずとデータセット全体の平均あたりに生じやすくなる。 場合によっては、セントロイドが別のセントロイドに囲まれるなどして、一つの特徴ベクトルも属さないセントロイドがでてくる。 サンプルコードでは、そのような事態を想定していなかったのでセントロイドが計算できなくなって上手く動作しなかった。

先ほどの KMeans 法を図示しているページでも、特徴ベクトルの数 N に対してクラスタの数 k を増やして実行してみよう。 一つの特徴ベクトルも属さないクラスタが生じることが分かる。

初期化を改良してみる

一つの特徴ベクトルも属さないクラスタが生じることについて、問題がないと捉えることもできるかもしれない。 その場合には、一つも属さないクラスタのセントロイドを、前回の位置から動かなさないようにすることでケアできそうだ。

とはいえ、ここでは別のアプローチで問題を解決してみることにする。 それというのは、初期のセントロイドをデータセットに含まれるランダムな特徴ベクトルにする、というものだ。 つまり、最初に作るセントロイドの持つ特徴ベクトルがランダムに選んだ要素の特徴ベクトルと一致する。 こうすれば各セントロイドは、少なくとも一つ以上の特徴ベクトルが属することは確定できる。 ユークリッド距離がゼロになる特徴ベクトルが、必ず一つは存在するためだ。

次のサンプルコードでは、初期のセントロイドの作り方だけを先ほどと変更している。 ここでポイントとなるのは、最初にセントロイドとして採用する特徴ベクトルは重複しないように決める必要があるということだ。 そこで、初期のセントロイドは特徴ベクトルをシャッフルした上で先頭から k 個を取り出して決めている。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from matplotlib import pyplot as plt
from sklearn import datasets
import numpy as np


class KMeans(object):
    """KMeans 法でクラスタリングするクラス"""

    def __init__(self, n_clusters=2, max_iter=300):
        """コンストラクタ

        Args:
            n_clusters (int): クラスタ数
            max_iter (int): 最大イテレーション数
        """
        self.n_clusters = n_clusters
        self.max_iter = max_iter

        self.cluster_centers_ = None

    def fit_predict(self, features):
        """クラスタリングを実施する

        Args:
            features (numpy.ndarray): ラベル付けするデータ

        Returns:
            numpy.ndarray: ラベルデータ
        """
        # 要素の中からセントロイド (重心) の初期値となる候補をクラスタ数だけ選び出す
        feature_indexes = np.arange(len(features))
        np.random.shuffle(feature_indexes)
        initial_centroid_indexes = feature_indexes[:self.n_clusters]
        self.cluster_centers_ = features[initial_centroid_indexes]

        # ラベル付けした結果となる配列はゼロで初期化しておく
        pred = np.zeros(features.shape)

        # クラスタリングをアップデートする
        for _ in range(self.max_iter):
            # 各要素から最短距離のセントロイドを基準にラベルを更新する
            new_pred = np.array([
                np.array([
                    self._euclidean_distance(p, centroid)
                    for centroid in self.cluster_centers_
                ]).argmin()
                for p in features
            ])

            if np.all(new_pred == pred):
                # 更新前と内容が同じなら終了
                break

            pred = new_pred

            # 各クラスタごとにセントロイド (重心) を再計算する
            self.cluster_centers_ = np.array([features[pred == i].mean(axis=0)
                                              for i in range(self.n_clusters)])

        return pred

    def _euclidean_distance(self, p0, p1):
        return np.sum((p0 - p1) ** 2)


def main():
    # クラスタ数
    N_CLUSTERS = 5

    # Blob データを生成する
    dataset = datasets.make_blobs(centers=N_CLUSTERS)

    # 特徴データ
    features = dataset[0]
    # 正解ラベルは使わない
    # targets = dataset[1]

    # クラスタリングする
    cls = KMeans(n_clusters=N_CLUSTERS)
    pred = cls.fit_predict(features)

    # 各要素をラベルごとに色付けして表示する
    for i in range(N_CLUSTERS):
        labels = features[pred == i]
        plt.scatter(labels[:, 0], labels[:, 1])

    centers = cls.cluster_centers_
    plt.scatter(centers[:, 0], centers[:, 1], s=100,
                facecolors='none', edgecolors='black')

    plt.show()


if __name__ == '__main__':
    main()

それでは、上記のサンプルコードを実行してみよう。

$ python kmeans_select.py

これまでと変わらないけど、次のようなグラフが表示されるはず。 f:id:momijiame:20170319155439p:plain

今度は N_CLUSTERS を増やしたり、何度やっても先ほどのようなエラーにはならない。

まとめ

今回は教師なし学習のクラスタリングという問題を解くアルゴリズムの KMeans 法を実装してみた。 KMeans 法はシンプルなアルゴリズムだけど、セントロイドをどう初期化するかは流派があるみたい。

はじめてのパターン認識

はじめてのパターン認識

Python: k 近傍法を実装してみる

k 近傍法 (k-Nearest Neighbor algorithm) というのは、機械学習において教師あり学習分類問題を解くためのアルゴリズム。 教師あり学習における分類問題というのは、あらかじめ教師信号として特徴ベクトルと正解ラベルが与えられるものをいう。 その教師信号を元に、未知の特徴ベクトルが与えられたときに正解ラベルを予想しましょう、というもの。

k 近傍法は機械学習アルゴリズムの中でも特にシンプルな実装になっている。 じゃあ、シンプルな分だけ性能が悪いかというと、そんなことはない。 分類精度であれば、他のアルゴリズムに比べても引けを取らないと言われている。 ただし、計算量が多いという重大な欠点がある。 そのため、それを軽減するための改良アルゴリズムも数多く提案されている。

k 近傍法では、与えられた未知の特徴ベクトルを、近い場所にある教師信号の正解ラベルを使って分類する。 特徴ベクトルで近くにあるものは似たような性質を持っているはず、という考え方になっている。 今回は、そんな k 近傍法の基本的な実装を Python で書いてみることにした。

使った環境は次の通り。

$ sw_vers 
ProductName:    Mac OS X
ProductVersion: 10.12.3
BuildVersion:   16D32
$ python --version
Python 3.5.3

依存パッケージをインストールする

あらかじめ、今回のソースコードで使う依存パッケージをインストールしておく。

$ pip install numpy scipy scikit-learn

最近傍法を実装してみる

k 近傍法では、未知の特徴ベクトルの近くにある k 点の教師信号を用いる。 この k 点を 1 にしたときのことを特に最近傍法 (Nearest Neighbor algorithm) と呼ぶ。 一番近い場所にある教師信号の正解ラベルを返すだけなので、さらに実装しやすい。 そこで、まずは最近傍法から書いてみることにしよう。

次のサンプルコードでは最近傍法を NearestNeighbors というクラスで実装している。 インターフェースは scikit-learn っぽくしてみた。 分類するデータセットは Iris (あやめ) を使っている。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import numpy as np

from sklearn import datasets
from sklearn.model_selection import LeaveOneOut
from sklearn.metrics import accuracy_score


class NearestNeighbors(object):

    def __init__(self):
        self._train_data = None
        self._target_data = None

    def fit(self, train_data, target_data):
        """訓練データを学習する"""
        # あらかじめ計算しておけるものが特にないので保存だけする
        self._train_data = train_data
        self._target_data = target_data

    def predict(self, x):
        """訓練データから予測する"""
        # 判別する点と教師データとのユークリッド距離を計算する
        distances = np.array([self._distance(p, x) for p in self._train_data])
        # 最もユークリッド距離の近い要素のインデックスを得る
        nearest_index = distances.argmin()
        # 最も近い要素のラベルを返す
        return self._target_data[nearest_index]

    def _distance(self, p0, p1):
        """二点間のユークリッド距離を計算する"""
        return np.sum((p0 - p1) ** 2)


def main():
    # Iris データセットをロードする
    iris_dataset = datasets.load_iris()

    # 特徴データとラベルデータを取り出す
    features = iris_dataset.data
    targets = iris_dataset.target

    # LOO 法で汎化性能を調べる
    predicted_labels = []

    loo = LeaveOneOut()
    for train, test in loo.split(features):
        train_data = features[train]
        target_data = targets[train]

        # モデルを学習させる
        model = NearestNeighbors()
        model.fit(train_data, target_data)
        
        # 一つ抜いたテストデータを識別させる
        predicted_label = model.predict(features[test])
        predicted_labels.append(predicted_label)

    # 正解率を出力する
    score = accuracy_score(targets, predicted_labels)
    print(score)


if __name__ == '__main__':
    main()

上記のサンプルコードでは Leave-One-Out 法というやり方で交差検証をしている。

交差検証というのは、学習に使わなかったデータを使って正解を導くことができたか調べる方法を指す。 モデルの性能は、未知のデータに対する対処能力で比べる必要がある。 この、未知のデータに対する対処能力のことを汎化性能と呼ぶ。 交差検証をすることで、この汎化性能を測ることができる。

Leave-One-Out 法では、教師信号の中から検証用のデータをあらかじめ一つだけ抜き出しておく。 そして、それをモデルが正解できるのか調べるやり方だ。 抜き出す対象を一つずつずらしながら、データセットに含まれる要素の数だけ繰り返す。 他の交差検証に比べると計算量は増えるものの、厳密で分かりやすい。

上記のサンプルコードの実行結果は次の通り。

$ python nn.py 
0.96

汎化性能で 96% の正解率が得られた。

scikit-learn を使う場合

ちなみに、自分で書く代わりに scikit-learn にある実装を使う場合も紹介しておく。

次のサンプルコードは k 近傍法の実装を scikit-learn の KNeighborsClassifier に代えたもの。 インターフェースを揃えてあったので、使うクラスが違う以外は先ほどと同じソースコードになっている。 scikit-learn で最近傍法をしたいときは KNeighborsClassifier の k に 1 を指定するだけで良い。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from sklearn import datasets
from sklearn.model_selection import LeaveOneOut
from sklearn.metrics import accuracy_score
from sklearn.neighbors import KNeighborsClassifier


def main():
    iris_dataset = datasets.load_iris()

    features = iris_dataset.data
    targets = iris_dataset.target

    predicted_labels = []

    loo = LeaveOneOut()
    for train, test in loo.split(features):
        train_data = features[train]
        target_data = targets[train]

        model = KNeighborsClassifier(n_neighbors=1)
        model.fit(train_data, target_data)

        predicted_label = model.predict(features[test])
        predicted_labels.append(predicted_label)

    score = accuracy_score(targets, predicted_labels)
    print(score)


if __name__ == '__main__':
    main()

上記のサンプルコードの実行結果は次の通り。

$ python knn_scikit.py 
0.96

当然だけど同じ班化性能になっている。

k 近傍法を実装してみる

先ほど示した最近傍法の実装では、最寄りの教師信号だけを使うものとなっていた。 今度は、より汎用的に近くにある k 点の教師信号を使う実装にしてみる。

次のサンプルコードでは KNearestNeighbors クラスのコンストラクタに k を渡せるようになっている。 実装としては、分類するときに教師信号をユークリッド距離でソートした上で k 個を取り出している。 ひとまず k については 3 を指定した。 もしこれを 1 にすれば最近傍法になる。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from collections import Counter

import numpy as np

from sklearn import datasets
from sklearn.model_selection import LeaveOneOut
from sklearn.metrics import accuracy_score


class KNearestNeighbors(object):

    def __init__(self, k=1):
        self._train_data = None
        self._target_data = None
        self._k = k

    def fit(self, train_data, target_data):
        """訓練データを学習する"""
        # あらかじめ計算しておけるものが特にないので保存だけする
        self._train_data = train_data
        self._target_data = target_data

    def predict(self, x):
        """訓練データから予測する"""
        # 判別する点と教師データとのユークリッド距離を計算する
        distances = np.array([self._distance(p, x) for p in self._train_data])
        # ユークリッド距離の近い順でソートしたインデックスを得る
        nearest_indexes = distances.argsort()[:self._k]
        # 最も近い要素のラベルを返す
        nearest_labels = self._target_data[nearest_indexes]
        # 近傍のラベルで一番多いものを予測結果として返す
        c = Counter(nearest_labels)
        return c.most_common(1)[0][0]

    def _distance(self, p0, p1):
        """二点間のユークリッド距離を計算する"""
        return np.sum((p0 - p1) ** 2)


def main():
    iris_dataset = datasets.load_iris()

    features = iris_dataset.data
    targets = iris_dataset.target

    predicted_labels = []

    loo = LeaveOneOut()
    for train, test in loo.split(features):
        train_data = features[train]
        target_data = targets[train]

        model = KNearestNeighbors(k=3)
        model.fit(train_data, target_data)

        predicted_label = model.predict(features[test])
        predicted_labels.append(predicted_label)

    score = accuracy_score(targets, predicted_labels)
    print(score)


if __name__ == '__main__':
    main()

上記の実行結果は次の通り。

$ python knn.py 
0.96

汎化性能は k=1 のときと変わらないようだ。

最適な k を探す

k 近傍法では、計算に近傍何点を使うか (ようするに k) がハイパーパラメータとなっている。 ハイパーパラメータというのは、機械学習において人間が調整する必要のあるパラメータのことをいう。

次は、最適な k を探してみることにする。 といっても、やることは単に総当りで探すだけ。

せっかくならパラメータによる汎化性能の違いを可視化したい。 そこで matplotlib も入れておこう。

$ pip install matplotlib
$ mkdir -p ~/.matplotlib
$ cat << 'EOF' > ~/.matplotlib/matplotlibrc
backend: TkAgg
EOF

次のサンプルコードでは k を 1 ~ 20 の間で調整しながら総当りで汎化性能を計算している。 データセットごとに最適な k が異なるところを見ておきたいので Iris (あやめ) と Digits (数字) で調べることにした。 自分で実行するときは、データセットのロード部分にあるコメントアウトを切り替えてほしい。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from matplotlib import pyplot as plt

from sklearn import datasets
from sklearn.model_selection import LeaveOneOut
from sklearn.metrics import accuracy_score
from sklearn.neighbors import KNeighborsClassifier


def main():
    # データセットをロードする
    dataset = datasets.load_digits()
    # dataset = datasets.load_iris()

    # 特徴データとラベルデータを取り出す
    features = dataset.data
    targets = dataset.target

    # 検証する近傍数の上限
    K = 20
    ks = range(1, K + 1)

    # 使う近傍数ごとに正解率を計算する
    accuracy_scores = []
    for k in ks:
        # Leave-One-Out 法で汎化性能を測る
        predicted_labels = []
        loo = LeaveOneOut()
        for train, test in loo.split(features):
            train_data = features[train]
            target_data = targets[train]

            # モデルを学習させる    
            model = KNeighborsClassifier(n_neighbors=k)
            model.fit(train_data, target_data)
    
            # 一つだけ取り除いたテストデータを識別させる
            predicted_label = model.predict(features[test])
            predicted_labels.append(predicted_label)
    
        # 正解率を計算する
        score = accuracy_score(targets, predicted_labels)
        print('k={0}: {1}'.format(k, score))

        accuracy_scores.append(score)

    # 使う近傍数ごとの正解率を折れ線グラフで可視化する
    X = list(ks)
    plt.plot(X, accuracy_scores)

    plt.xlabel('k')
    plt.ylabel('accuracy rate')
    plt.show()
    

if __name__ == '__main__':
    main()

まずはデータセットとして Digits を使ったときから。 実行結果は次のようになる。

$ python knn_tuning.py 
k=1: 0.988313856427379
k=2: 0.986644407345576
k=3: 0.988313856427379
k=4: 0.9877573734001113
k=5: 0.9877573734001113
k=6: 0.9855314412910406
k=7: 0.9855314412910406
k=8: 0.9844184752365053
k=9: 0.9833055091819699
k=10: 0.9821925431274346
k=11: 0.9844184752365053
k=12: 0.9827490261547023
k=13: 0.9844184752365053
k=14: 0.9816360601001669
k=15: 0.9816360601001669
k=16: 0.9805230940456316
k=17: 0.9805230940456316
k=18: 0.9794101279910963
k=19: 0.9766277128547579
k=20: 0.9782971619365609

どうやら Digits のときは k を 1 か 3 にするのが良さそうだ。 f:id:momijiame:20170318133735p:plain

続いて Iris を使ったとき。

$ python knn_tuning.py 
k=1: 0.96
k=2: 0.9466666666666667
k=3: 0.96
k=4: 0.96
k=5: 0.9666666666666667
k=6: 0.96
k=7: 0.9666666666666667
k=8: 0.9666666666666667
k=9: 0.9666666666666667
k=10: 0.9733333333333334
k=11: 0.9733333333333334
k=12: 0.96
k=13: 0.9666666666666667
k=14: 0.9733333333333334
k=15: 0.9733333333333334
k=16: 0.9666666666666667
k=17: 0.9733333333333334
k=18: 0.9733333333333334
k=19: 0.98
k=20: 0.98

今度は全然違うグラフになった。 どうやら Iris なら k は 20 にするのが良いらしい。 もしかすると、さらに増やすと良い可能性もある? f:id:momijiame:20170318133823p:plain

まとめ

今回は Python を使って教師あり学習の分類問題を解くためのアルゴリズムの一つ、k 近傍法を実装してみた。 k 近傍法は単純な割に分類精度は決して低くないものの、計算量が多いという欠点がある。 k 近傍法では、計算に近傍何点を使うのが適しているかはデータセットによって異なる。 そのため、異なる k を使って汎化性能を測定して決定しよう。

ちなみに、計算量の多さを軽減するための手法としては、圧縮型 k 近傍法、分岐限定法、疑似最近傍探索などがあるようだ。 それらについては、機会があれば改めて実装してみたい。

はじめてのパターン認識

はじめてのパターン認識

Python: データセットを標準化する効果を最近傍法で確かめる

データセットの標準化については、このブログでも何回か扱っている。 しかし、実際にデータセットを標準化したときの例については試していなかった。

blog.amedama.jp

blog.amedama.jp

そこで、今回は UCI の提供する小麦 (seeds) データセットを最近傍法で分類したときに、スコアが上がる様を見てみたいと思う。 あらかじめ、いくつかの説明変数が教師信号として与えられるので、そこから小麦の品種を分類 (Classification) する。

UCI Machine Learning Repository: seeds Data Set

今回使った環境は次の通り。

$ sw_vers 
ProductName:    Mac OS X
ProductVersion: 10.12.3
BuildVersion:   16D32
$ python --version
Python 3.5.3

下準備

まずは今回使う Python のパッケージを pip でインストールする。 どれも科学計算系で定番のやつ。

$ pip install numpy pandas scipy scikit-learn

データセットを標準化しない場合

今回使う最近傍法というアルゴリズムでは、分類したい点から最も近くにあるデータの種別を使って分類する。 近さにはユークリッド距離を使うため、データセットの説明変数の大きさや単位に影響を受けやすい。 例えば説明変数の中に 1,000m ~ 10,000m を取る次元と 0.1cm ~ 1cm を取る次元があるとしよう。 当然ながら、ユークリッド距離を計算するとそのままでは前者の次元の影響が大きくなってしまう。 今回使うデータセットでも原理的には同じ問題が発生する。

次のサンプルコードでは、データセットを標準化しない状態で最近傍法を使った分類を実施している。 そして Leave-One-Out 法を使って、計測した汎化性能を出力する。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import pandas as pd
from sklearn.model_selection import LeaveOneOut
from sklearn.metrics import accuracy_score
from sklearn.neighbors import KNeighborsClassifier


def main():
    # 小麦データセットをダウンロードする
    dataset_url = 'https://archive.ics.uci.edu/ml/machine-learning-databases/00236/seeds_dataset.txt'  # noqa
    df = pd.read_csv(dataset_url, delim_whitespace=True, header=None)

    # データフレームから説明変数と目的変数を取り出す
    features = df.loc[:, :6].get_values()
    targets = df.loc[:, 7].get_values()

    # 予測した結果の入れ物
    predicted_labels = []

    # LOO で交差検証する
    loo = LeaveOneOut()
    for train, test in loo.split(features):
        train_data = features[train]
        target_data = targets[train]

        # k-NN 法を使う
        model = KNeighborsClassifier(n_neighbors=1)
        # 訓練データを学習させる
        model.fit(train_data, target_data)
        # テストデータを予測させる
        predicted_label = model.predict(features[test])
        # 予測した結果を追加する
        predicted_labels.append(predicted_label)

    # 正解率を出力する
    score = accuracy_score(targets, predicted_labels)
    print(score)


if __name__ == '__main__':
    main()

上記のサンプルコードの実行結果は次の通り。 データセットを標準化しない状態では約 90.5% の汎化性能が得られた。

$ python withoutnorm.py
0.904761904762

データセットを標準化する場合

それでは、次はデータセットを標準化する場合を試してみよう。 標準化されたデータセットでは、説明変数の各次元の値が平均 0 で標準偏差が 1 になる。 つまり、元々の単位や大きさは無くなってそれぞれの値の間隔の比率だけが残されることになる。

次のサンプルコードでは、先ほどとデータセットを標準化するところだけ変更している。 データセットの標準化には scikit-learn に用意されている StandardScaler を用いた。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import pandas as pd
from sklearn.model_selection import LeaveOneOut
from sklearn.metrics import accuracy_score
from sklearn.neighbors import KNeighborsClassifier
from sklearn.preprocessing import StandardScaler


def main():
    dataset_url = 'https://archive.ics.uci.edu/ml/machine-learning-databases/00236/seeds_dataset.txt'  # noqa
    df = pd.read_csv(dataset_url, delim_whitespace=True, header=None)

    features = df.loc[:, :6].get_values()
    targets = df.loc[:, 7].get_values()

    # データセットを Z-Score に標準化する
    sc = StandardScaler()
    sc.fit(features)
    normalized_features = sc.transform(features)

    predicted_labels = []

    loo = LeaveOneOut()
    for train, test in loo.split(normalized_features):
        train_data = normalized_features[train]
        target_data = targets[train]

        model = KNeighborsClassifier(n_neighbors=1)
        model.fit(train_data, target_data)

        predicted_label = model.predict(normalized_features[test])
        predicted_labels.append(predicted_label)

    score = accuracy_score(targets, predicted_labels)
    print(score)


if __name__ == '__main__':
    main()

上記の実行結果は次の通り。 今度は汎化性能が約 93.8% に上昇している。 データセットを標準化するだけで分類精度が 3.3% 上がった。

$ python withnorm.py
0.938095238095

まとめ

データセットを標準化することで、使う機械学習アルゴリズムによっては汎化性能を上げることができる。

Python: Pykka でアクターモデルについて学ぶ

アクターモデルというのは、並行処理のプログラミングモデルの一つだ。 並行処理という言葉からは、まずマルチスレッドとかをイメージすると思うけど、それよりも抽象度の高い概念となっている。 つまり、アクターモデルというのはマルチスレッドなどを用いて構築することになる。 どちらかといえばプロセス間通信 (IPC) の技法であって、共有メモリやロック、RPC と比較するものかもしれない。

そんなアクターモデルは、概念とか使ったときの嬉しさを理解・実感するのがなかなか難しいモデルだとも思う。 理由としては、使い始めるまでに必要なコード量が多かったり、それなりの規模のアプリケーションで使わないとメリットが分かりづらい点が挙げられる。 ただ、これはあくまで主観的なものだけど、アクターモデルをベースに組まれたアプリケーションは規模が大きくなっても並行処理をしているコードが読みやすい。 共有メモリやロックを使う場合だと、平行処理できるパートの量と可読性がトレードオフになりがちな気がするのとは対照的な感じがしている。

上記の理由としては、それぞれの手法のアプローチが次のように根本的に異なるためだと思う。

  • 共有メモリやロックを使うやり方
    • 考え方のベースはシングルスレッド
    • ボトルネックになっている末端の処理を局所的に並行にしていく
  • アクターモデル
    • 考え方のベースはマルチスレッド
    • 末端の処理はシングルスレッドで処理される
      • ただし、アクターを横に並べることで並行度を上げられる

今回は、そんなアクターモデルを Python で実装するためのフレームワークである Pykka を使ってみることにする。 ざっと調べた感じ、この Pykka が Python で汎用的なアクターモデルを実装するためのパッケージとしては最も有名そうだった。 Pykka は Scala/Java でアクターモデルを実装するためのフレームワークである Akka の影響を受けている。 使われている用語の多くも Akka から取られているらしいけど、まあ使う分には特に意識する必要はない。

ただし、Pykka は単純に Akka を Python で再実装したものではない。 次のコンポーネントは明示的にサポートしないことが宣言されている。 もし欲しければ自分で作り込まなきゃいけない。

  • スーパーバイザ
    • セルフヒーリングとかを実現するやつ
  • メッセージルータ
    • 並行処理を書きやすくしたりアプリケーションの骨子を作るためのやつ
  • ネットワーク越しの分散処理
    • 並行処理のタスクをネットワーク越しの各ホストに分配するやつ

今回使った環境は次の通り。

$ sw_vers
ProductName:    Mac OS X
ProductVersion: 10.12.3
BuildVersion:   16D32
$ python --version
Python 3.6.0

インストール

インストールは Python のパッケージマネージャである pip を使ってさくっとできる。

$ pip install pykka

もし pip が入っていないときはインストールしよう。

$ curl https://bootstrap.pypa.io/get-pip.py | sudo python

あと、システムの実行環境を汚さないように Python 仮想環境を使えるようにしておくのがおすすめ。 これには例えば virtualenv なんかを使う。

$ sudo pip install virtualenv

そこらへんはお好みで。

はじめてのアクターモデル

アクターモデルでは、その名の通りアクターというコンポーネントを組み合わせてアプリケーションを構築していく。

アクターという概念を構成する要素は、実のところたったの三つしかない。 まず、他のアクターから何らかのデータを受け取るために用意されたメッセージキューが一つ目。 そして、その受け取ったデータを渡す先となるメッセージハンドラが二つ目。 最後に、メッセージキューからデータを取り出して、それをメッセージハンドラにディスパッチする役目のスレッドが三つ目だ。 この三つの要素が、それぞれのアクターインスタンスに必ず備わっている。 メッセージキューとスレッドはデフォルトで一つずつ、メッセージハンドラについては自分で何をするか記述する。

重要な点としては、アクター同士はメモリなどを共有することがない。 することがないというよりも、する必要がない・するとアクターモデルを使う意味が薄れるという方が正しい。 アクターモデルでは、何かを共有することなくお互いのメッセージキューにメッセージを放り込み合うことで情報をやり取りして処理を進めていく。 いわゆるメッセージパッシングと呼ばれるものだ。

また、アクターの中と外の世界は非同期の壁で断絶されている。 アクターの中身を、その外側から同期的に直接触ることはできない。 触ることができてしまうと、処理の競合が起こってしまうからだ。 アクターの中を触るには、いわゆる Future を必ず介すことになる。 処理はキューイングされていつかは実行されるけど、具体的にそれがいつかは分からない状態でアクセスする。

説明が長くなったけど、そろそろ具体的なサンプルコードを見ていくことにしよう。 以下は Pykka を使って作る、最も単純なアプリケーションの一つだ。 このアプリケーションではメッセージを受け取って、それを出力するだけのアクターを定義している。 説明はコードの中にコメントとして記述している。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import pykka


class GreetingActor(pykka.ThreadingActor):
    """メッセージを受け取って挨拶をするアクター"""

    def on_receive(self, message):
        """メッセージが届いたときに実行されるハンドラ"""
        # メッセージの中から名前を取り出す
        name = message['name']
        # 内容をプリントする
        print('Hello, {0}!'.format(name))


def main():
    # アクターを起動する
    actor_ref = GreetingActor.start()

    # アクターにメッセージを送る (一方向)
    actor_ref.tell({'name': 'World'})

    # アクターを停止する (本当はちょっと待った方が良いかも)
    actor_ref.stop()


if __name__ == '__main__':
    main()

アクターは pykka.ThreadingActor というクラスを継承して作る。 on_receive() というメソッドはアクターがメッセージを受け取って、それがディスパッチされたときに発火するハンドラだ。 サンプルコードでは、そのアクターを起動してメッセージを送りつけている。 アクターは start() メソッドが呼ばれた瞬間からメッセージキューを読み出すスレッドが起動して動き出す。 あとは、アクターに tell() メソッドでメッセージを送ることができる。

また、ポイントとしてはアクターを起動したときに返ってくる内容が ActorRef というアクターへの間接的な参照になっている点だ。 つまり、起動したアクターそのものを直接触ることはできず、必ずこの参照を経由してアクセスすることになる。 直接触ることを防ぐことによって、アクターの中で競合を起こさないようにしているのだろう。

ちなみに、上記のサンプルコードではメッセージを送りつけた直後にアクターを停止しているけど、本当はちょっと待った方がお行儀が良いのかもしれない。 ちょっと待つというのは、具体的にはメインスレッドをスリープさせるということだ。 何故なら、キューからのメッセージの取り出しおよびハンドラの発火は別スレッドで進むため、すぐに実行される保証がないから。 とはいえ、他にやっていることもないからすぐに処理されるはずで、現実的には問題ないんだけど。

上記のサンプルコードを実行すると、次のようになる。

$ python hellopykka.py
Hello, World!

ただのハローワールドだけど、これはアクターモデルで実装されている!

アクターモデルのコンストラクタに引数を渡す

ここからは、アクターモデルというより Pykka の使い方の説明に入っていく。

先ほどのサンプルコードでは、アクターにメッセージの形で引数を渡した。 とはいえ、起動時の初期パラメータを設定したい場合も多いと思う。 そんなときはコンストラクタに引数を用意すれば良い。 親クラスのコンストラクタは必ず呼ぶこと。 というより、呼ばないと例外になる。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import pykka


class GreetingActor(pykka.ThreadingActor):

    def __init__(self, greeting='Hello'):
        # 親クラスのコンストラクタは必ず呼ぶ
        super().__init__()

        # 挨拶の内容をカスタマイズできるようにする
        self.greeting = greeting

    def on_receive(self, message):
        name = message['name']
        print('{0}, {1}!'.format(self.greeting, name))


def main():
    # アクターを起動するときに使ったパラメータがコンストラクタに渡される
    actor_ref = GreetingActor.start('Hajimemashite')

    actor_ref.tell({'name': 'Sekai'})

    actor_ref.stop()


if __name__ == '__main__':
    main()

コンストラクタに渡す引数はアクターの起動時に渡す。

上記のサンプルコードを実行した結果は次の通り。

$ python initpykka.py
Hajimemashite, Sekai!

コンストラクタで渡したメッセージが使われている。

返り値のあるメッセージパッシング

これまでの例では、アクターにメッセージを送っておしまいという、一方的なものだった。 アクターに対して一方的にメッセージを送りつけるには tell() メソッドを使う。 とはいえ、何かメッセージを渡して、その結果として返り値を戻してほしいというのはよくある。

そんなときはメッセージを送りつけるときに ask() メソッドを使う。 また、メッセージハンドラでは返り値を戻すように作る。 返り値を受け取る側では、いくつか受け取り方を選ぶことができる。 次のサンプルコードでは、戻り値のあるパターンを実装している。 そして、返り値の受け取り側でもいくつかのやり方を試している。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import pykka


class GreetingActor(pykka.ThreadingActor):

    def on_receive(self, message):
        name = message['name']
        # 返り値があると ask() メソッドでその値が得られるようになる
        return 'Hello, {}!'.format(name)


def main():
    actor_ref = GreetingActor.start()

    # ActorRef#ask() を使うとアクターから返り値が得られる (返事があるまでブロックする)
    answer = actor_ref.ask({'name': 'World'})
    print(answer)

    # ブロックする時間にタイムアウトを設定したいときは timeout を指定する
    answer = actor_ref.ask({'name': 'Timeout'}, timeout=1)
    print(answer)

    # 処理をブロックさせたくないときは block=False を指定すると
    # 返り値が Future オブジェクトになる
    future = actor_ref.ask({'name': 'Future'}, block=False)
    # Future オブジェクトから結果を取り出す (ここでブロックする)
    answer = future.get()
    print(answer)

    future = actor_ref.ask({'name': 'Future-Timeout'}, block=False)
    # Future から結果を取り出すときのタイムアウトも指定できる
    answer = future.get(timeout=1)
    print(answer)

    actor_ref.stop()


if __name__ == '__main__':
    main()

上記で返り値を受け取るやり方がいくつかあることについて補足しておく。 前述した通り、アクターの内外は非同期の壁で断絶されている。 つまり、メッセージに対する返事が返ってくるタイミングは、将来であることは分かるものの具体的にいつかは分からないのだ。

そのため、返事の受け取り方として「返ってくるまですぐにブロックする」か「とりあえず受理された旨の返事だけ受け取る」やり方を選べるわけだ。 後者では、いわゆる Future を受け取っている。

先ほどのサンプルコードを実行した結果は次の通り。

$ python askpykka.py
Hello, World!
Hello, Timeout!
Hello, Future!
Hello, Future-Timeout!

アクターの中で発生した例外を処理する

もし、アクターの返り値のある処理で例外が発生したことに気づくことができないと大変だ。 Pykka では ActorRef#ask() メソッド経由で呼び出された処理の中で例外が起こったときは、呼び出し元にそれが伝搬するようになっている。

次のサンプルコードではメッセージハンドラの中で意図的に例外を上げている。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import pykka


class ExceptionActor(pykka.ThreadingActor):

    def on_receive(self, message):
        raise Exception('Oops!')


def main():
    actor_ref = ExceptionActor.start()

    try:
        # ちなみに ActorRef.tell() を使うときは、投げっぱなしなので
        # 呼び出し先で例外が起こっても呼び出し元でキャッチできない
        actor_ref.ask({'name': 'World'})
    except Exception as e:
        print(e)

    actor_ref.stop()


if __name__ == '__main__':
    main()

上記のサンプルコードを実行した結果は次の通り。

$ python trypykka.py
Oops!

親スレッドが死んだときにアクターのスレッドも停止させたい

ちなみに、先ほどのサンプルコードで例外をキャッチしないでいるとどうなるか試してみただろうか。 実はアプリケーションの実行はそのまま継続することになる。 どうしてそんなことが起こるかというと、子スレッドがデーモンスレッドになっていないためだ。 アプリケーションは全てのスレッドが停止するまで処理を継続することになる。

もし、親スレッドが停止したときに子スレッドも停止させたい、つまり一家心中を図りたいときは次のようにする。 アクターには use_daemon_thread というアトリビュートが用意されているので、そのフラグを True にしよう。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import pykka


class ExceptionActor(pykka.ThreadingActor):
    # 親のスレッドが終了したら、同時に停止させたいときはこうする
    # 親が死んだとき一家心中になるスレッドをデーモンスレッドという
    use_daemon_thread = True

    def on_receive(self, message):
        raise Exception('Oops!')


def main():
    actor_ref = ExceptionActor.start()

    # デーモンスレッドになっていないと、ここで例外になって親スレッドが死ぬ
    actor_ref.ask({'name': 'World'})

    # すると、このコードが実行されないのでアクターのスレッドが停止しない
    actor_ref.stop()


if __name__ == '__main__':
    main()

こうすれば、例外を取りこぼしたことでメインスレッドが停止したときに、そこから起動されたアクターのスレッドも同時に停止する。

$ python daemonpykka.py
Traceback (most recent call last):
  File "daemonpykka.py", line 27, in <module>
    main()
  File "daemonpykka.py", line 20, in main
    actor_ref.ask({'name': 'World'})
  File "/Users/amedama/.virtualenvs/py36/lib/python3.6/site-packages/pykka/actor.py", line 435, in ask
    return future.get(timeout=timeout)
  File "/Users/amedama/.virtualenvs/py36/lib/python3.6/site-packages/pykka/threading.py", line 52, in get
    compat.reraise(*self._data['exc_info'])
  File "/Users/amedama/.virtualenvs/py36/lib/python3.6/site-packages/pykka/compat.py", line 24, in reraise
    raise value
  File "/Users/amedama/.virtualenvs/py36/lib/python3.6/site-packages/pykka/actor.py", line 201, in _actor_loop
    response = self._handle_receive(message)
  File "/Users/amedama/.virtualenvs/py36/lib/python3.6/site-packages/pykka/actor.py", line 304, in _handle_receive
    return self.on_receive(message)
  File "daemonpykka.py", line 13, in on_receive
    raise Exception('Oops!')
Exception: Oops!

アクターのフラグを切り替えて、挙動の違いを確認してみよう。

アクタープロキシを使って処理を呼び出す

これまでの例では pykka.ThreadingActor の持つ on_receive() というメソッドをオーバーライドすることでアクターを実装してきた。 しかし、このやり方はだいぶプリミティブで Pykka の低レイヤーな API を使っている。 実際には、アクタープロキシという機構を使って、アクターのメソッドをあたかも直接呼び出しているような風に処理できる。

次のサンプルコードではアクターで独自のメソッド greeting を定義している。 その呼び出し側では ActorRef から ActorProxy を生成して、あたかもメソッドを直接呼び出しているように扱っている。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import pykka


class GreetingActor(pykka.ThreadingActor):

    def greeting(self):
        print('Hello, World!')


def main():
    actor_ref = GreetingActor.start()

    # ActorProxy を取得する
    actor_proxy = actor_ref.proxy()

    # ActorProxy では定義されているメソッドを直接呼べる (ように見える)
    actor_proxy.greeting()

    actor_ref.stop()


if __name__ == '__main__':
    main()

上記のサンプルコードを実行した結果は次の通り。

$ python proxypykka.py
Hello, World!

アクタープロキシを使って返り値のある場合

先ほどのサンプルコードでは、アクタープロキシを使って返り値のない場合を扱っていた。 今度は返り値のあるときを試してみる。 アクタープロキシを使って返り値を受け取るときは、実は必ず Future を介すことになる。

次のサンプルコードではアクタープロキシを使った上で返り値を受け取っている。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import pykka


class GreetingActor(pykka.ThreadingActor):

    def greeting(self):
        return 'Hello, World!'


def main():
    actor_ref = GreetingActor.start()
    actor_proxy = actor_ref.proxy()

    # 返り値のある呼び出しでは Future オブジェクトが返る
    future = actor_proxy.greeting()
    answer = future.get()
    print(answer)

    actor_ref.stop()


if __name__ == '__main__':
    main()

アクタープロキシ経由で実行したメソッドは、必ず Future 経由で返り値を受け取ることになる。

上記の実行結果は次の通り。

$ python futurepykka.py
Hello, World!

アクターのアトリビュートにアクタープロキシ経由でアクセスする

前述した通り、アクターの中と外は非同期の壁で断絶されていることから、直接同期的に触ることはできない。 それをすると、アクターの処理が競合する恐れがあるためだ。

では、アクターのアトリビュートを触るのにも、必ずアクセサに相当するメソッドを定義する必要があるのだろうか? だとすると相当に面倒くさいと思うんだけど、これは定義する必要がない。 アクタープロキシ経由であればアトリビュートも Future 経由にはなるが、触ることができる。

次のサンプルコードではアクターに message というアトリビュートを定義している。 メインスレッドからは、それをアクタープロキシ経由で読み出している。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import pykka


class MyActor(pykka.ThreadingActor):

    def __init__(self):
        super().__init__()
        # アクターでないオブジェクトをアトリビュートとして持つ
        self.message = 'Hello, World!'


def main():
    actor_ref = MyActor.start()
    actor_proxy = actor_ref.proxy()

    # アクターでないアトリビュートにアクセスすると Future が返る
    future = actor_proxy.message
    # 値を取り出す
    answer = future.get()
    print(answer)

    actor_ref.stop()


if __name__ == '__main__':
    main()

ポイントとしては、アトリビュートへのアクセスであっても Future が返る点だ。 こうすればアクセスが非同期に処理されるため、処理が競合することはない。

上記の処理結果は次の通り。

$ python attrpykka.py
Hello, World!

アクターのメンバが持つメソッドをアクタープロキシ経由で呼び出す

先ほどのサンプルコードではアクターのメンバがただの文字列だった。 次は、これがメソッドを持ったインスタンスだったときの話。

次のサンプルコードでは、アクター MyActor がアクターでないアトリビュート NonActor のインスタンスをメンバに持っている。 それを、アクタープロキシ経由で外から呼び出している。 この場合、メンバとなるアクターでないオブジェクトには pykka_traversable というフラグが必要になる。 このフラグを立てることで、アクタープロキシから呼び出すことができるようになる。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import pykka


class NonActor(object):
    """アクターのアトリビュートになる非アクタークラス"""
    # pykka_traversable アトリビュートを True にするとプロキシ経由でアクセスできるようになる
    pykka_traversable = True

    def greeting(self):
        """プロキシ経由で外部から呼びたいメソッド"""
        return 'Hello, World!'


class MyActor(pykka.ThreadingActor):

    def __init__(self):
        super().__init__()
        # アクターでないオブジェクトをアトリビュートとして持つ
        self.non_actor = NonActor()


def main():
    actor_ref = MyActor.start()
    actor_proxy = actor_ref.proxy()

    # アクターのアトリビュートのオブジェクトが持つメソッドを呼び出せるようになる
    future = actor_proxy.non_actor.greeting()
    # Future が返ってくるので値を取り出す
    answer = future.get()
    print(answer)

    actor_ref.stop()


if __name__ == '__main__':
    main()

このパターンでも返り値は Future になる。

上記のサンプルコードの実行結果は次の通り。

$ python methodpykka.py
Hello, World!

アクターモデルを使った並行処理

これまでのサンプルコードでは、アクターは定義しているものの平行処理をしていなかった。 なので、説明の都合上、仕方なかったとはいえ一体何のためにこんなことを、という印象だったかもしれない。 ここからは、ようやくアクターモデルを使って並行処理をしてみる。

アクターモデルの並行処理では、基本的にアクターをたくさん用意してタスクをそれぞれのアクターに振り分けていく。 アクターには、それぞれメッセージを処理するスレッドが起動しているので、処理をバラバラに並列で進めることができるわけ。

次のサンプルコードでは、 アクター MyActor に擬似的に処理に時間のかかるメソッド process() を用意している。 時間がかかる、というところはスレッドをスリープすることで表現した。 ここでは、時間のかかる処理を 4 回呼び出している。 それぞれの処理には 1 秒かかる。 一つのアクターで直列に処理すれば、単純に 4 秒かかるはずの処理を、ここでは二つのアクターに振り分けている。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from __future__ import division

import time

import pykka


class MyActor(pykka.ThreadingActor):

    def process(self, processing_id, sleep_sec):
        """時間がかかる処理を模したメソッド"""
        # time.sleep() を使って時間がかかる処理をエミュレートする
        time.sleep(sleep_sec)
        # 処理が終わったら内容を出力する
        print('Completed: ID {0} in {1} s'.format(processing_id, sleep_sec))


def main():
    # アクターを二つ起動する
    actor_proxy_a = MyActor.start().proxy()
    actor_proxy_b = MyActor.start().proxy()

    # 処理を開始した時刻を記録しておく
    start_time = time.time()

    # それぞれのアクターに仕事を二つずつ割り当てる
    # 処理はアクターのスレッドがキューから取り出して開始する
    future1 = actor_proxy_a.process(1, 1)
    future2 = actor_proxy_a.process(2, 1)

    future3 = actor_proxy_b.process(3, 1)
    future4 = actor_proxy_b.process(4, 1)

    # 計算が終わるのを待つ (threading#join() のようなものと考えると分かりやすい)
    future1.get()
    future2.get()
    future3.get()
    future4.get()

    # 処理が終わったら時刻を記録する
    end_time = time.time()

    # 一連の処理にかかった時間を出力する
    elapsed_time = end_time - start_time
    print('Elapsed Time: {0} s'.format(elapsed_time))

    # すべてのアクターを停止させる
    pykka.ActorRegistry.stop_all()


if __name__ == '__main__':
    main()

さて、処理が終わるのに実際は何秒かかるだろうか?

上記のサンプルコードの実行結果は次の通り。 二つのアクターで処理した結果として本来一つのアクターなら 4 秒かかるところ 2 秒で完了している。

$ python cuncurrentpykka.py
Completed: ID 3 in 1 s
Completed: ID 1 in 1 s
Completed: ID 2 in 1 s
Completed: ID 4 in 1 s
Elapsed Time: 2.0071158409118652 s

アクターモデルでも設計によっては競合が起こる

アクターモデルを使えば、競合が起きないというような説明をされることがたまにあるようだ。 しかし、その「競合が起きない」というのは、より低レイヤーな側面からアクターモデルを見たときの話だと思う。 あくまで、アクターのメッセージキューを処理するスレッドが一つだけなので、アクターの中で処理が競合しないというのに過ぎない。 より高レイヤーに、アプリケーションという側面からアクターモデルを見たとき、競合は容易に起こる。 それぞれの競合は全く別の次元の話なんだけど、もし混ぜて考えてしまうと危険だ。 次は、その競合について見てみよう。

サンプルコードでは、銀行口座を模したモデルを用意した。 AccountActor という銀行口座を模したアクターを UserActor というユーザを模したアクターが操作する。 UserActor はまず銀行口座から預金残高を読み取って、その値を元に預金残高を更新する。 ここでポイントなのは AccountActor の API がアトミックになっていないという点だ。 つまり、銀行口座の読み取りから更新までの間に別のアクターからの処理が割り込む余地がある。 サンプルコードでは、口座の読み取りから更新までの間に意図的にスリープを入れることで競合が起こりやすいようにしている。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import time

import pykka


class AccountActor(pykka.ThreadingActor):
    """銀行口座を模したアクター"""

    def __init__(self, initial_money=0):
        super(AccountActor, self).__init__()

        self._money = initial_money

    def get_balance(self):
        # 口座の残高を得る
        return self._money

    def set_balance(self, money):
        # 口座の残高を設定する
        self._money = money


class UserActor(pykka.ThreadingActor):
    """銀行口座を使うユーザを模したアクター"""

    def __init__(self, username, account_actor):
        super(UserActor, self).__init__()

        self._username = username
        self._account_actor = account_actor

    def save(self, amount):
        # 残高を読み出す
        future = self._account_actor.get_balance()
        balance = future.get()
        print('{0} read balance: {1}'.format(self._username, balance))
        # 残高を読んでから次の処理に移るのに時間がかかった!
        time.sleep(1)
        print('{0} sleep...'.format(self._username))
        # 口座残高を更新する
        new_balance = balance + amount
        future = self._account_actor.set_balance(new_balance)
        future.get()
        print('{0} set balance: {1}'.format(self._username, new_balance))


def main():
    # アクターを起動する
    account_actor_ref = AccountActor.start(100)
    account_actor_proxy = account_actor_ref.proxy()

    user_a_ref = UserActor.start('UserA', account_actor_proxy)
    user_a_proxy = user_a_ref.proxy()

    user_b_ref = UserActor.start('UserB', account_actor_proxy)
    user_b_proxy = user_b_ref.proxy()

    # ユーザ A と B が同時に口座に 50 を入金する
    future_a = user_a_proxy.save(50)
    future_b = user_b_proxy.save(50)

    future_a.get()
    future_b.get()

    # 最終的な口座の残高を出力する
    print('Final Balance: {0}'.format(account_actor_proxy.get_balance().get()))

    pykka.ActorRegistry.stop_all()


if __name__ == '__main__':
    main()

二つのユーザを模したアクターは銀行残高をそれぞれ 50 だけ増やすべく処理を実施する。 初期の預金残高は 100 なので、最終的には 200 になっていないとおかしい。

では、上記のサンプルコードを実行してみよう。

$ python conflictpykka.py
UserA read balance: 100
UserB read balance: 100
UserA sleep...
UserB sleep...
UserA set balance: 150
UserB set balance: 150
Final Balance: 150

最終的な口座残高は 150 となってしまった。 見事に競合が発生している。

問題は、アクターの API が更新に対してアトミックになっていなかった点だ。 次の改良したサンプルコードでは、預金残高を更新するメソッドを update_balance() という形でアトミックにしている。 現在の預金残高を元に、口座のアクター自体が更新をかけるので、間に別のアクターの処理が入り込む余地はない。 アクターの中はメッセージキューを処理する一つのスレッドで処理されているので、同時に同じメソッドが走る心配はない。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import pykka


class AccountActor(pykka.ThreadingActor):

    def __init__(self, initial_money=0):
        super(AccountActor, self).__init__()

        self._money = initial_money

    def get_balance(self):
        return self._money

    def update_balance(self, money):
        # アトミックに口座残高を更新する
        self._money = self._money + money


class UserActor(pykka.ThreadingActor):

    def __init__(self, username, account_actor):
        super(UserActor, self).__init__()

        self._username = username
        self._account_actor = account_actor

    def save(self, amount):
        # 口座残高を更新する
        future = self._account_actor.update_balance(amount)
        future.get()
        print('{0} update balance: {1}'.format(self._username, amount))


def main():
    # アクターを起動する
    account_actor_ref = AccountActor.start(100)
    account_actor_proxy = account_actor_ref.proxy()

    user_a_ref = UserActor.start('UserA', account_actor_proxy)
    user_a_proxy = user_a_ref.proxy()

    user_b_ref = UserActor.start('UserB', account_actor_proxy)
    user_b_proxy = user_b_ref.proxy()

    # ユーザ A と B が同時に口座に 50 を入金する
    future_a = user_a_proxy.save(50)
    future_b = user_b_proxy.save(50)

    future_a.get()
    future_b.get()

    # 最終的な口座の残高を出力する
    print('Final Balance: {0}'.format(account_actor_proxy.get_balance().get()))

    pykka.ActorRegistry.stop_all()


if __name__ == '__main__':
    main()

上記のサンプルコードを実行した結果は次の通り。 今度はちゃんと預金残高が 200 に更新されている。

$ python conflictpykka2.py
UserA update balance: 50
UserB update balance: 50
Final Balance: 200

アクターモデルでも設計によってはデッドロックが起こる

典型的なデッドロックは、マルチスレッドでリソースをロックする順番が不定になっていたときに起こる。 アクターモデルはロックを使わないので、それならデッドロックが起こらないかというと、そんなことはない。

次のサンプルコードではデッドロックを発生させている。 ActorAActorB のメソッドを呼んで、ActorBActorA のメソッドを呼ぶ、というように操作が交差している。 それぞれのメソッドは各アクターのスレッドで非同期に処理されるので、返り値で得られる Future を get() すると完了するまでブロックする。 すなわち、双方がお互いの処理の完了を待ってデッドロックを起こしてしまう。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import logging
import signal
import os

import pykka
from pykka import debug


class ActorA(pykka.ThreadingActor):

    def method_a(self, actor_b_proxy):
        # 2. ActorB の method_b を呼ぶ
        future = actor_b_proxy.method_b()
        # 3. Future の中身を取り出そうとすると ActorA のスレッドは取り出せるまでブロックする
        future.get()


class ActorB(pykka.ThreadingActor):

    def __init__(self, actor_a_proxy):
        super(ActorB, self).__init__()
        self.actor_a_proxy = actor_a_proxy

    def method_b(self):
        # 4. ActorA の method_a を呼ぶ (本当は自身のプロキシを渡す必要があるけど)
        future = self.actor_a_proxy.method_a()
        # 5. Future の中身を取り出そうとすると ActorB のスレッドは取り出せるまでブロックする
        future.get()
        # XXX: ActorA のスレッドは既にブロックしているのでデッドロックを起こす


def main():
    """意図的にデッドロックを起こしてみる"""

    # デバッグレベルのログが出力されるようにする
    logging.basicConfig(level=logging.DEBUG)

    # シグナル SIGUSR1 でスレッドのトレースバックを出力する
    signal.signal(signal.SIGUSR1, debug.log_thread_tracebacks)

    # アクターを起動する
    actor_a_ref = ActorA.start()
    actor_a_proxy = actor_a_ref.proxy()
    # ActorB には ActorA のプロキシを渡しておく
    actor_b_ref = ActorB.start(actor_a_proxy)
    actor_b_proxy = actor_b_ref.proxy()

    # 1. ActorA の method_a を呼ぶ (引数として ActorB を渡す)
    actor_a_proxy.method_a(actor_b_proxy)

    # メインスレッドでは PID を出力する
    pid = os.getpid()
    print(f'PID: {pid}')

    # アクターがデーモンスレッドではないので、ここまできてもプログラムは終了しない


if __name__ == '__main__':
    main()

上記を実行すると、何も表示されずに処理が止まる。

$ python deadlockpykka.py
DEBUG:pykka:Registered ActorA (urn:uuid:5b470837-65b9-408d-9843-23982f2efcc5)
DEBUG:pykka:Starting ActorA (urn:uuid:5b470837-65b9-408d-9843-23982f2efcc5)
DEBUG:pykka:Registered ActorB (urn:uuid:02ab87fd-705b-4ec4-9718-78870b99eb90)
DEBUG:pykka:Starting ActorB (urn:uuid:02ab87fd-705b-4ec4-9718-78870b99eb90)
PID: 9199

サンプルコードでプロセス ID を表示するようにしているところに気づいただろうか? また Pykka の debug.log_thread_tracebacks という関数がシグナル SIGUSR1 に対して登録されている。 そこで、別のターミナルからこのハンドラに対してシグナルを送ってみよう。

$ kill -SIGUSR1 9199

すると、今のスレッドが何処を処理しているのかトレースバックが表示される。 この表示からは、二つのアクターのスレッドが future.get() でブロックしていることが分かる。

$ python deadlockpykka.py
DEBUG:pykka:Registered ActorA (urn:uuid:5b470837-65b9-408d-9843-23982f2efcc5)
DEBUG:pykka:Starting ActorA (urn:uuid:5b470837-65b9-408d-9843-23982f2efcc5)
DEBUG:pykka:Registered ActorB (urn:uuid:02ab87fd-705b-4ec4-9718-78870b99eb90)
DEBUG:pykka:Starting ActorB (urn:uuid:02ab87fd-705b-4ec4-9718-78870b99eb90)
PID: 9199
CRITICAL:pykka:Current state of ActorB-2 (ident: 123145451573248):
  File "/Users/amedama/.pyenv/versions/3.6.0/lib/python3.6/threading.py", line 884, in _bootstrap
    self._bootstrap_inner()
  File "/Users/amedama/.pyenv/versions/3.6.0/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/Users/amedama/.pyenv/versions/3.6.0/lib/python3.6/threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/amedama/.virtualenvs/py36/lib/python3.6/site-packages/pykka/actor.py", line 201, in _actor_loop
    response = self._handle_receive(message)
  File "/Users/amedama/.virtualenvs/py36/lib/python3.6/site-packages/pykka/actor.py", line 295, in _handle_receive
    return callee(*message['args'], **message['kwargs'])
  File "deadlockpykka.py", line 31, in method_b
    future.get()
  File "/Users/amedama/.virtualenvs/py36/lib/python3.6/site-packages/pykka/threading.py", line 50, in get
    self._data = self._queue.get(True, timeout)
  File "/Users/amedama/.pyenv/versions/3.6.0/lib/python3.6/queue.py", line 164, in get
    self.not_empty.wait()
  File "/Users/amedama/.pyenv/versions/3.6.0/lib/python3.6/threading.py", line 295, in wait
    waiter.acquire()

CRITICAL:pykka:Current state of ActorA-1 (ident: 123145446318080):
  File "/Users/amedama/.pyenv/versions/3.6.0/lib/python3.6/threading.py", line 884, in _bootstrap
    self._bootstrap_inner()
  File "/Users/amedama/.pyenv/versions/3.6.0/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/Users/amedama/.pyenv/versions/3.6.0/lib/python3.6/threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/amedama/.virtualenvs/py36/lib/python3.6/site-packages/pykka/actor.py", line 201, in _actor_loop
    response = self._handle_receive(message)
  File "/Users/amedama/.virtualenvs/py36/lib/python3.6/site-packages/pykka/actor.py", line 295, in _handle_receive
    return callee(*message['args'], **message['kwargs'])
  File "deadlockpykka.py", line 18, in method_a
    future.get()
  File "/Users/amedama/.virtualenvs/py36/lib/python3.6/site-packages/pykka/threading.py", line 50, in get
    self._data = self._queue.get(True, timeout)
  File "/Users/amedama/.pyenv/versions/3.6.0/lib/python3.6/queue.py", line 164, in get
    self.not_empty.wait()
  File "/Users/amedama/.pyenv/versions/3.6.0/lib/python3.6/threading.py", line 295, in wait
    waiter.acquire()

CRITICAL:pykka:Current state of MainThread (ident: 140736596534208):
  File "/Users/amedama/.pyenv/versions/3.6.0/lib/python3.6/threading.py", line 1290, in _shutdown
    t.join()
  File "/Users/amedama/.pyenv/versions/3.6.0/lib/python3.6/threading.py", line 1056, in join
    self._wait_for_tstate_lock()
  File "/Users/amedama/.pyenv/versions/3.6.0/lib/python3.6/threading.py", line 1072, in _wait_for_tstate_lock
    elif lock.acquire(block, timeout):
  File "/Users/amedama/.virtualenvs/py36/lib/python3.6/site-packages/pykka/debug.py", line 68, in log_thread_tracebacks
    stack = ''.join(traceback.format_stack(frame))

Pykka の並行処理に関しては、上記のようにしてデバッグができる。

スレッドプールっぽいものを作って並行処理をしてみる

次はもうちょっと実践的に、スレッドプールっぽく並行処理をしてみよう。 とはいえ、あまり汎用的には作らずベタ書きだけど。

次のサンプルコードでは、先ほどの例と同じように MyActor に処理に時間のかかるメソッド process() を用意している。 アクターを配列で 10 個生成した上で、タスクを 20 回呼び出してみよう。 ただし、通常のスレッドプールのような形で終わったものから新しいタスクを割り当てる、というものではない。 あらかじめ、各アクターに決まった番号のタスクを割り当てるような形になっている。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from __future__ import division

import time
import random

import pykka


class MyActor(pykka.ThreadingActor):

    def process(self, processing_id):
        """時間がかかる処理を模したメソッド"""
        # 処理時間をランダムに用意する
        sleep_sec = random.randint(1, 10) / 10
        # time.sleep() を使って時間がかかる処理をエミュレートする
        time.sleep(sleep_sec)
        # 処理が終わったら内容を出力する
        print('Completed: ID {0} in {1} s'.format(processing_id, sleep_sec))
        # 処理の ID を返す
        return processing_id


def main():
    # スレッドプールのようにアクターを用意する
    actor_proxy_pool = [MyActor.start().proxy() for _ in range(10)]

    # それぞれのアクターに仕事を割り当てていく
    # かかる時間はランダムにする
    futures = [
        actor_proxy_pool[i % len(actor_proxy_pool)].process(i)
        for i in range(20)
    ]

    # すべての処理が完了するのを待ち合わせる
    answers = pykka.get_all(futures)

    print('*** Calculation Completed ***')

    # 完了した結果を出力する
    for answer in answers:
        print('ID: {0}'.format(answer))

    # すべてのアクターを停止させる
    pykka.ActorRegistry.stop_all()


if __name__ == '__main__':
    main()

実行結果は次の通り。 各タスクが順番バラバラで実行されているのが分かる。

$ python tpoolpykka.py
Completed: ID 3 in 0.1 s
Completed: ID 0 in 0.3 s
Completed: ID 7 in 0.3 s
Completed: ID 13 in 0.2 s
Completed: ID 1 in 0.5 s
Completed: ID 4 in 0.5 s
Completed: ID 5 in 0.6 s
Completed: ID 2 in 0.7 s
Completed: ID 8 in 0.7 s
Completed: ID 10 in 0.4 s
Completed: ID 17 in 0.4 s
Completed: ID 6 in 0.8 s
Completed: ID 9 in 1.0 s
Completed: ID 15 in 0.5 s
Completed: ID 11 in 0.6 s
Completed: ID 12 in 0.5 s
Completed: ID 16 in 0.5 s
Completed: ID 14 in 0.9 s
Completed: ID 19 in 0.5 s
Completed: ID 18 in 0.9 s
*** Calculation Completed ***
ID: 0
ID: 1
ID: 2
ID: 3
ID: 4
ID: 5
ID: 6
ID: 7
ID: 8
ID: 9
ID: 10
ID: 11
ID: 12
ID: 13
ID: 14
ID: 15
ID: 16
ID: 17
ID: 18
ID: 19

出力の最後を見て分かる通り、実行結果として得られる配列の順番は変わっていない。 これは、単純に内部でやっているのが順番に Future#get() をしていく、という処理だからだ。 とはいえ、最終的に得られる結果の順番が入れ替わらないという特性は並行処理をする上で優れた特徴だと思う。 場合によっては終わったものから返ってきて、後からソートし直さないといけないことなんかもあるし。

ラウンドロビンで処理するメッセージルータを実装してみる

次は、お試しで作ってみたメッセージルータについて。 これはほんとに試しに作ってみただけなのでプルーフオブコンセプトと思ってほしい。

次のサンプルコードでは MyActor をラップするメッセージルータの RoundRobinRouter を定義している。 RoundRobinRouter では、内部的に MyActor のプロキシを複数生成して、処理を振り分ける。 先ほどのべた書きのスレッドプールを、もうちょっと洗練させた感じ。 RoundRobinRouter は、まず処理をフォワードしたいアクターと同時並行数 (アクターをいくつ生成するか) を指定して起動する。 その上で RoundRobinRouter#forward() メソッドに呼び出したいメソッド名と引数を指定するだけ。 注意点としては、返り値が二重の Future になっているところ。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from __future__ import division

import time
import random

import pykka


class MyActor(pykka.ThreadingActor):

    def process(self, processing_id):
        """時間がかかる処理を模したメソッド"""
        # 処理時間をランダムに用意する
        sleep_sec = random.randint(1, 10) / 10
        # time.sleep() を使って時間がかかる処理をエミュレートする
        time.sleep(sleep_sec)
        # 処理が終わったら内容を出力する
        print('Completed: ID {0} in {1}s by {2}'.format(
            processing_id,
            sleep_sec,
            self.actor_urn,
        ))
        # 処理の ID を返す
        return processing_id


class RoundRobinRouter(pykka.ThreadingActor):
    """ラップしたアクターにラウンドロビンで処理を割り当てていくメッセージルータ"""

    def __init__(self, actor_type, size):
        super(RoundRobinRouter, self).__init__()

        # アクターのプロキシを生成しておく
        self._proxy_pool = [actor_type.start().proxy()
                            for _ in range(size)]
        # 次に処理させるアクターを決めるジェネレータ
        self._index_gen = self._next_actor_index()

    def forward(self, name, *args, **kwargs):
        # 次の処理させるアクターのインデックス
        actor_index = next(self._index_gen)
        # 次の処理させるアクターのプロキシ
        actor_proxy = self._proxy_pool[actor_index]
        # フォワード対象のアトリビュートをプロキシから取り出す
        attribute = getattr(actor_proxy, name)
        # 引数を渡して呼び出す
        return attribute(*args, **kwargs)

    def _next_actor_index(self):
        """次の処理対象のアクターのインデックスを生成するジェネレータ"""
        i = 0
        while True:
            yield i
            # 0, 1, 2, 3 ... 0, 1, 2, 3 と繰り返す
            i = (i + 1) % len(self._proxy_pool)


def main():
    # 五つの MyActor をラップしたルータを生成する
    router_ref = RoundRobinRouter.start(MyActor, 5)
    router_proxy = router_ref.proxy()

    futures = [router_proxy.forward('process', i)
               for i in range(10)]

    # このルータを経由させると Future の二段重ねになる
    inner_futures = pykka.get_all(futures)
    answers = pykka.get_all(inner_futures)

    print('*** Calculation Completed ***')

    for answer in answers:
        print('ID: {0}'.format(answer))

    # すべてのアクターを停止させる
    pykka.ActorRegistry.stop_all()


if __name__ == '__main__':
    main()

上記の実行結果は次の通り。 タスクを処理したアクターの識別子も同時に出力するようになっている。 見ると、タスクの識別子が 5 つの周期で同じアクターによって処理されていることが分かる。 上手く動作しているようだ。

$ python routerpykka.py
Completed: ID 1 in 0.3s by urn:uuid:a5d3f2e6-6d59-4351-9be8-e47023428e99
Completed: ID 2 in 0.6s by urn:uuid:ee6f93cf-66df-4500-b416-84defff8b195
Completed: ID 3 in 0.6s by urn:uuid:c6003463-87fc-4988-b8bb-34186e867630
Completed: ID 0 in 0.7s by urn:uuid:9a2fe8bc-49fe-4b19-96d5-6658ecae3677
Completed: ID 7 in 0.1s by urn:uuid:ee6f93cf-66df-4500-b416-84defff8b195
Completed: ID 4 in 0.8s by urn:uuid:0e3b5668-6825-43f5-bd95-9cf2c706a03d
Completed: ID 6 in 0.5s by urn:uuid:a5d3f2e6-6d59-4351-9be8-e47023428e99
Completed: ID 9 in 0.3s by urn:uuid:0e3b5668-6825-43f5-bd95-9cf2c706a03d
Completed: ID 5 in 0.8s by urn:uuid:9a2fe8bc-49fe-4b19-96d5-6658ecae3677
Completed: ID 8 in 1.0s by urn:uuid:c6003463-87fc-4988-b8bb-34186e867630
*** Calculation Completed ***
ID: 0
ID: 1
ID: 2
ID: 3
ID: 4
ID: 5
ID: 6
ID: 7
ID: 8
ID: 9

まとめ

今回は Python でアクターモデルを実装するためのフレームワークである Pykka を試してみた。

アクターモデルというのは、並行処理のプログラミングモデルの一つ。 ただし、マルチスレッドやマルチプロセスよりも、抽象度の高い概念になっている。 対比するとしたら共有メモリやロック、RPC などがそれに当たると思う。

アクターモデルの概念を理解する上で重要なのは、アクターを構成する三つの要素を理解すること。 一つ目がメッセージキュー、二つ目がメッセージハンドラ、三つ目がそれらをディスパッチするスレッド。 生成されたアクターには、それぞれにこの三つの要素が必ず備わっている。 メッセージキューとスレッドはあらかじめアクターごとに一つずつ生成されるし、メッセージハンドラは自分で定義する。

アクターモデルでは、内部で競合を起こさないようにアクターの内外が非同期の壁で分断されている。 アクターの内部に、外側から同期的に直接触れることはできない。 必ず、触るとしたら Future を経由することになる。 同期的に触っているように見えたとしても、内部的には必ず経由している。

アクターモデルで並行処理を実現するには、アクターを横にたくさん並べてタスクを割り当てていく。 ただし、それぞれのタスクはアトミックに作られていないといけない。 アクターモデルで競合が起こらないというのは、あくまで「低レイヤーで内部的には」という但し書きがつく。 また、ロックがないからといって、デッドロックが起こらないというわけでもない。

Pykka の場合、Akka の機能を全て揃えているわけではないので、割りとよく使いそうな機能でも自分で書く必要がある。 また、正直アプリケーションで直接 Pykka を使うというシチュエーションは、あまり無いのかもしれないと思った。 なぜなら Pykka の提供する API は、直接使うにはあまりに低レイヤーすぎるからだ。 なので、Pykka をベースにした何らかのフレームワークをまず書いて、その上でアプリケーションを組むことになりそうな印象を受けた。

いじょう。

Python: Keras/TensorFlow の学習を GPU で高速化する (Ubuntu 16.04 LTS)

以前、このブログで Keras/TensorFlow の学習スピードを GPU を使って速くする記事を書いた。 ただし、このとき使った OS は Mac OS X (macOS Sierra) だった。

blog.amedama.jp

とはいえ NVIDIA の dGPU を積んだ Mac がどれだけあるんだというと、正直なかなか無いと思う。 実際にやってみるとしたら Linux だよねということで、今回は Ubuntu 16.04 LTS を使う場合について書く。

インストールの手順については次の公式ドキュメントをベースに進める。

Installing TensorFlow on Ubuntu  |  TensorFlow

環境について

今回使った OS のバージョンなどは次の通り。

$ cat /etc/lsb-release
DISTRIB_ID=Ubuntu
DISTRIB_RELEASE=16.04
DISTRIB_CODENAME=xenial
DISTRIB_DESCRIPTION="Ubuntu 16.04.2 LTS"
$ uname -r
4.4.0-66-generic

また、学習に GPU を使う場合はハードウェアの環境にも制約があるので、ここで記述しておく。 まず、当たり前だけど大前提として CUDA をサポートしている NVIDIA のグラフィックカードが必ず必要になる。 ちなみに TensorFlow のドキュメントを見ると一部 Radeon も対応しているらしいことが書かれていたけど、試してはいない。

さらに、NVIDIA のグラフィックカードなら何でも良いのかというと、そういうわけでもない。 次の NVIDIA 公式ページを見てほしい。 ここには NVIDIA の各 GPU ごとの Compute Capability という数値が載っている。

CUDA GPUs | NVIDIA Developer

Compute Capability という字面だけを見ると、性能を表す数値のような印象を受けるけど、そうではない。 数値はパフォーマンスとは関係がなく、あくまで GPU の世代というか載っている機能のバージョンらしい。 問題は、この数値が少なくとも 3.0 以上ないと現行の CUDA 8.0 がサポートしていない、という点だ。

また、PyPI で配布されている TensorFlow の GPU 版バイナリパッケージは CUDA 8.0 でビルドされている。 なので、もちろん動かす環境にも CUDA 8.0 が必要になる。 以上のことから Compute Capability が 3.0 以上の GPU を用意すべきだ。

ちなみに、ドキュメントを見ると一応 TensorFlow は今のところ CUDA 7.5 もサポートしているらしい。 CUDA 7.5 であれば Compute Capability は 2.0 以上をサポートしている。 ただし、そのときは自分でソースコードからビルドしなければいけない。 それについて、この記事では扱わない。

今回 GPU には GeForce GTX 1050 Ti を用意した。 これの Compute Capability は 6.1 となっている。 具体的なグラフィックカードは次のもの。

MSI GeForce GTX 1050 Ti 4G OC グラフィックスボード VD6191

MSI GeForce GTX 1050 Ti 4G OC グラフィックスボード VD6191

GPU のチョイスについては、現時点 (2017/3) で最もコストパフォーマンスが高くなるものを選んだ。 PassMark というベンチマークのスコアを値段で割った値を考えると GeForce GTX 1050 Ti が一番高くなる。 あとは、CPU に比べると GPU の学習速度はそれこそ一桁か、下手すると二桁は違ってくる。 つまり、GPU が載っていること自体がまずは重要で、性能はその次かなという感じ。 ようするに 60 秒かかった処理が 6 秒になるのは大きく違うけど、6 秒が 3 秒になってどれだけ嬉しいか?っていう。 それもあってハイエンドモデルではなくエントリーモデルを選ぶことにした。

グラフィックカードを刺して GPU を Ubuntu 16.04 LTS が認識することを確認しておこう。

$ lspci | grep -i nvidia
01:00.0 VGA compatible controller: NVIDIA Corporation Device 1c82 (rev a1)
01:00.1 Audio device: NVIDIA Corporation Device 0fb9 (rev a1)

CUDA Toolkit をインストールする

インストールの第一歩として CUDA Toolkit をインストールする。 そのために、まずは NVIDIA 公式サイトを参照する。

developer.nvidia.com

インストール方法は色々とあるけど、ここでは CUDA のリモートリポジトリを APT に登録するやり方を取る。 これなら、後からマイナーアップデートがあったときにも自動で更新できる。 上記ウェブサイトで、次のように操作して deb ファイルのダウンロード URL を取得しよう。

Linux > x86_64 > Ubuntu > 16.04 > deb (network)

上記で得られた URL から deb ファイルをダウンロードしてくる。

$ wget http://developer.download.nvidia.com/compute/cuda/repos/ubuntu1604/x86_64/cuda-repo-ubuntu1604_8.0.61-1_amd64.deb

deb ファイルをインストールする。 これで CUDA のリポジトリが APT パッケージシステムに登録される。

$ sudo dpkg -i cuda-repo-ubuntu1604_8.0.61-1_amd64.deb
$ sudo apt-get update

これで APT 経由で CUDA をインストールできるようになる。 ちなみに、インストールにはとても長い時間がかかるので覚悟しておこう。 先ほどのインストール方法でローカルインストールを選んだ場合には 1.9GB のファイルを落とすことになる。 かかる時間は、推して知るべし。

$ sudo apt-get -y install cuda

上記が終わると、依存パッケージとして一緒に NVIDIA のグラフィックドライバなんかも入る。

$ dpkg -l | grep nvidia-[0-9]
ii  nvidia-375                            375.26-0ubuntu1                          amd64        NVIDIA binary driver - version 375.26
ii  nvidia-375-dev                        375.26-0ubuntu1                          amd64        NVIDIA binary Xorg driver development files

cuDNN をインストールする

次に cuDNN をインストールする。

cuDNN のインストールには、あらかじめ NVIDIA のサイトでデベロッパー登録が必要になる。 登録を済ませると cuDNN をダウンロードできるようになる。

https://developer.nvidia.com/rdp/cudnn-download

cuDNN の現行のバージョンは 5.1 となっている。 ダウンロードに使うパッケージは CUDA Toolkit のバージョンと組み合わせになっている。 今回であれば v5.1 + CUDA 8.0 のパッケージを選ぼう。

Download cuDNN v5.1 (Jan 20, 2017), for CUDA 8.0 > cuDNN v5.1 Library for Linux

ダウンロードしてきたら、それを CUDA のインストールされたディレクトリに放り込む。

$ sudo tar xvf cudnn-8.0-linux-x64-v5.1.tgz -C /usr/local
cuda/include/cudnn.h
cuda/lib64/libcudnn.so
cuda/lib64/libcudnn.so.5
cuda/lib64/libcudnn.so.5.1.10
cuda/lib64/libcudnn_static.a

libcupti-dev をインストールする

次に The CUDA Profiling Tools Interface もインストールしておく。

$ sudo apt-get -y install libcupti-dev

ここまで終わったら、一旦再起動しておこう。

$ sudo shutdown -r now

Keras/TensorFlow (GPU)

さて、次はいよいよ Keras/TensorFlow をインストールする…と言いたいところだけど、その前に。 システムの Python 実行環境を汚したくないので Python の仮想環境を作れるようにしよう。

ここでは virtualenvwrapper を使う。

$ sudo apt-get -y install virtualenvwrapper

シェルを起動し直す。

$ exec $SHELL

これで Python 仮想環境が作れるようになった。 tensorflow-with-gpu という名前で Python3 の仮想環境を作る。

$ mkvirtualenv tensorflow-with-gpu -p $(which python3)

これで最低限のパッケージが入った、システムから独立した仮想環境ができた。

(tensorflow-with-gpu) $ pip list --format=columns
Package       Version
------------- --------
appdirs       1.4.3
packaging     16.8
pip           9.0.1
pkg-resources 0.0.0
pyparsing     2.2.0
setuptools    34.3.2
six           1.10.0
wheel         0.30.0a0

ここに Keras と GPU 版 TensorFlow をインストールする。

(tensorflow-with-gpu) $ pip install keras tensorflow-gpu

インストールすると、こんな感じ。

(tensorflow-with-gpu) $ pip list --format=columns | grep -i -e keras -e tensorflow
Keras          1.2.2
tensorflow-gpu 1.0.1

ちなみに、以降はシェルのプレフィックスを表記しないけど Python 仮想環境上で実行し続けている。

GPU で学習させる

まずはベンチマーク用のアプリケーションで使うデータセット (MNIST) をダウンロードしよう。 Python の REPL を起動する。

$ python

Keras の mnist パッケージをインポートする。

>>> from keras.datasets import mnist
Using TensorFlow backend.
I tensorflow/stream_executor/dso_loader.cc:135] successfully opened CUDA library libcublas.so.8.0 locally
I tensorflow/stream_executor/dso_loader.cc:135] successfully opened CUDA library libcudnn.so.5 locally
I tensorflow/stream_executor/dso_loader.cc:135] successfully opened CUDA library libcufft.so.8.0 locally
I tensorflow/stream_executor/dso_loader.cc:135] successfully opened CUDA library libcuda.so.1 locally
I tensorflow/stream_executor/dso_loader.cc:135] successfully opened CUDA library libcurand.so.8.0 locally

load_data() 関数でデータセットのダウンロードが始まる。 これには少し時間がかかる。

>>> mnist.load_data()
Downloading data from https://s3.amazonaws.com/img-datasets/mnist.pkl.gz
...
[0, 0, 0, ..., 0, 0, 0],
[0, 0, 0, ..., 0, 0, 0],
[0, 0, 0, ..., 0, 0, 0]]], dtype=uint8), array([7, 2, 1, ..., 4, 5, 6], dtype=uint8)))

終わったら REPL から抜ける。

>>> exit()

続いて学習のベンチマークに使うアプリケーションをダウンロードしてくる。

$ curl -O https://raw.githubusercontent.com/fchollet/keras/master/examples/mnist_cnn.py
$ echo 'K.clear_session()' >> mnist_cnn.py

ベンチマーク用のアプリケーションを実行する。 これは MNIST データセットを CNN で認識するものになっている。 初めての実行だと GPU の環境を整えるので学習を始まるまで時間がかかるかも。

$ time python mnist_cnn.py
Using TensorFlow backend.
I tensorflow/stream_executor/dso_loader.cc:135] successfully opened CUDA library libcublas.so.8.0 locally
I tensorflow/stream_executor/dso_loader.cc:135] successfully opened CUDA library libcudnn.so.5 locally
I tensorflow/stream_executor/dso_loader.cc:135] successfully opened CUDA library libcufft.so.8.0 locally
I tensorflow/stream_executor/dso_loader.cc:135] successfully opened CUDA library libcuda.so.1 locally
I tensorflow/stream_executor/dso_loader.cc:135] successfully opened CUDA library libcurand.so.8.0 locally
X_train shape: (60000, 28, 28, 1)
60000 train samples
10000 test samples
Train on 60000 samples, validate on 10000 samples
Epoch 1/12
W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use SSE3 instructions, but these are available on your machine and could speed up CPU computations.
W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use SSE4.1 instructions, but these are available on your machine and could speed up CPU computations.
W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use SSE4.2 instructions, but these are available on your machine and could speed up CPU computations.
I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:910] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
I tensorflow/core/common_runtime/gpu/gpu_device.cc:885] Found device 0 with properties:
name: GeForce GTX 1050 Ti
major: 6 minor: 1 memoryClockRate (GHz) 1.455
pciBusID 0000:01:00.0
Total memory: 3.94GiB
Free memory: 3.84GiB
I tensorflow/core/common_runtime/gpu/gpu_device.cc:906] DMA: 0
I tensorflow/core/common_runtime/gpu/gpu_device.cc:916] 0:   Y
I tensorflow/core/common_runtime/gpu/gpu_device.cc:975] Creating TensorFlow device (/gpu:0) -> (device: 0, name: GeForce GTX 1050 Ti, pci bus id: 0000:01:00.0)
60000/60000 [==============================] - 8s - loss: 0.3661 - acc: 0.8871 - val_loss: 0.0864 - val_acc: 0.9726
Epoch 2/12
60000/60000 [==============================] - 6s - loss: 0.1337 - acc: 0.9604 - val_loss: 0.0613 - val_acc: 0.9806
Epoch 3/12
60000/60000 [==============================] - 6s - loss: 0.1046 - acc: 0.9694 - val_loss: 0.0524 - val_acc: 0.9838
Epoch 4/12
60000/60000 [==============================] - 6s - loss: 0.0879 - acc: 0.9737 - val_loss: 0.0428 - val_acc: 0.9858
Epoch 5/12
60000/60000 [==============================] - 6s - loss: 0.0755 - acc: 0.9773 - val_loss: 0.0393 - val_acc: 0.9870
Epoch 6/12
60000/60000 [==============================] - 6s - loss: 0.0683 - acc: 0.9801 - val_loss: 0.0368 - val_acc: 0.9875
Epoch 7/12
60000/60000 [==============================] - 6s - loss: 0.0648 - acc: 0.9806 - val_loss: 0.0367 - val_acc: 0.9888
Epoch 8/12
60000/60000 [==============================] - 6s - loss: 0.0593 - acc: 0.9820 - val_loss: 0.0353 - val_acc: 0.9889
Epoch 9/12
60000/60000 [==============================] - 6s - loss: 0.0548 - acc: 0.9833 - val_loss: 0.0328 - val_acc: 0.9890
Epoch 10/12
60000/60000 [==============================] - 6s - loss: 0.0529 - acc: 0.9845 - val_loss: 0.0316 - val_acc: 0.9897
Epoch 11/12
60000/60000 [==============================] - 6s - loss: 0.0508 - acc: 0.9848 - val_loss: 0.0308 - val_acc: 0.9902
Epoch 12/12
60000/60000 [==============================] - 6s - loss: 0.0484 - acc: 0.9852 - val_loss: 0.0298 - val_acc: 0.9899
Test score: 0.0297728348608
Test accuracy: 0.9899

real  1m28.273s
user  1m39.684s
sys   0m8.740s

全体の実行が一分半ほどで終わった。 一回のエポックは大体 6 秒ほどで終わっている。

ちなみに GPU の仕事っぷりは nvidia-smi コマンドから見られる。

$ nvidia-smi
Sun Mar 12 20:24:56 2017
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 375.26                 Driver Version: 375.26                    |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|===============================+======================+======================|
|   0  GeForce GTX 105...  Off  | 0000:01:00.0      On |                  N/A |
| 35%   38C    P0    58W /  75W |   3862MiB /  4037MiB |     77%      Default |
+-------------------------------+----------------------+----------------------+

+-----------------------------------------------------------------------------+
| Processes:                                                       GPU Memory |
|  GPU       PID  Type  Process name                               Usage      |
|=============================================================================|
|    0      1510    G   /usr/lib/xorg/Xorg                              59MiB |
|    0      3374    C   python                                        3799MiB |
+-----------------------------------------------------------------------------+

毎秒ごとに表示を更新したいときは、こんな感じで。

$ nvidia-smi -l 1

CPU で学習させる

ちなみに対比として CPU を使ったときの例も載せておく。

使った CPU は次の通り。 第一世代 Core i7 なので、なかなか古い。 現行世代なら Core i3 くらいの性能らしい。

$ cat /proc/cpuinfo | grep -i name | head -n 1
model name      : Intel(R) Core(TM) i7 CPU         860  @ 2.80GHz
$ cat /proc/cpuinfo | grep -i name | wc -l
8

余談ながら、当然チップセットやマザーボードも古いので今回使ったグラフィックカードを認識するかドキドキした。

先ほどインストールした GPU 版 TensorFlow をアンインストールして CPU 版 TensorFlow をインストールする。

$ pip uninstall -y tensorflow-gpu
$ pip install tensorflow

先ほどと同じようにベンチマーク用のアプリケーションを実行する。

$ time python mnist_cnn.py
Using TensorFlow backend.
X_train shape: (60000, 28, 28, 1)
60000 train samples
10000 test samples
Train on 60000 samples, validate on 10000 samples
Epoch 1/12
W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use SSE3 instructions, but these are available on your machine and could speed up CPU computations.
W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use SSE4.1 instructions, but these are available on your machine and could speed up CPU computations.
W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use SSE4.2 instructions, but these are available on your machine and could speed up CPU computations.
60000/60000 [==============================] - 117s - loss: 0.3733 - acc: 0.8858 - val_loss: 0.0885 - val_acc: 0.9723
Epoch 2/12
60000/60000 [==============================] - 117s - loss: 0.1347 - acc: 0.9596 - val_loss: 0.0635 - val_acc: 0.9799
Epoch 3/12
60000/60000 [==============================] - 115s - loss: 0.1031 - acc: 0.9693 - val_loss: 0.0565 - val_acc: 0.9818
Epoch 4/12
60000/60000 [==============================] - 115s - loss: 0.0887 - acc: 0.9740 - val_loss: 0.0448 - val_acc: 0.9852
Epoch 5/12
60000/60000 [==============================] - 114s - loss: 0.0780 - acc: 0.9773 - val_loss: 0.0415 - val_acc: 0.9868
Epoch 6/12
60000/60000 [==============================] - 113s - loss: 0.0707 - acc: 0.9788 - val_loss: 0.0376 - val_acc: 0.9870
Epoch 7/12
60000/60000 [==============================] - 112s - loss: 0.0651 - acc: 0.9808 - val_loss: 0.0352 - val_acc: 0.9893
Epoch 8/12
60000/60000 [==============================] - 112s - loss: 0.0604 - acc: 0.9818 - val_loss: 0.0352 - val_acc: 0.9888
Epoch 9/12
60000/60000 [==============================] - 112s - loss: 0.0555 - acc: 0.9840 - val_loss: 0.0332 - val_acc: 0.9884
Epoch 10/12
60000/60000 [==============================] - 112s - loss: 0.0542 - acc: 0.9835 - val_loss: 0.0322 - val_acc: 0.9895
Epoch 11/12
60000/60000 [==============================] - 111s - loss: 0.0495 - acc: 0.9853 - val_loss: 0.0319 - val_acc: 0.9897
Epoch 12/12
60000/60000 [==============================] - 110s - loss: 0.0477 - acc: 0.9857 - val_loss: 0.0326 - val_acc: 0.9893
Test score: 0.032642004689
Test accuracy: 0.9893

real  22m54.715s
user  123m6.744s
sys   46m53.296s

今度は 23 分ほどかかった。

GPU が 1 分 28 秒で終わったのに対して CPU では 22 分 54 秒かかっている。 つまり、GPU を使うことで学習が 15.6 倍高速化できた。 一回のエポック当たりでは 19.1 倍速くなっている。

まとめ

今回は OS として Ubuntu 16.04 LTS を使って Keras/TensorFlow の学習を GPU で高速化してみた。 CUDA がサポートしている GPU は Compute Capability という数値で表現されている。 この数値が大きいものほど、新しい世代のものになっている。 使う GPU については NVIDIA で、少なくとも Compute Capability が 3.0 以上のものを選ぼう。 もちろん、将来の CUDA で最小要件の Compute Capability は上がっていくはずなので、なるべく大きい方が良い。

いじょう。

MSI GeForce GTX 1050 Ti 4G OC グラフィックスボード VD6191

MSI GeForce GTX 1050 Ti 4G OC グラフィックスボード VD6191