CUBE SUGAR CONTAINER

技術系のこと書きます。

Python: Pykka でアクターモデルについて学ぶ

アクターモデルというのは、並行処理のプログラミングモデルの一つだ。 並行処理という言葉からは、まずマルチスレッドとかをイメージすると思うけど、それよりも抽象度の高い概念となっている。 つまり、アクターモデルというのはマルチスレッドなどを用いて構築することになる。 どちらかといえばプロセス間通信 (IPC) の技法であって、共有メモリやロック、RPC と比較するものかもしれない。

そんなアクターモデルは、概念とか使ったときの嬉しさを理解・実感するのがなかなか難しいモデルだとも思う。 理由としては、使い始めるまでに必要なコード量が多かったり、それなりの規模のアプリケーションで使わないとメリットが分かりづらい点が挙げられる。 ただ、これはあくまで主観的なものだけど、アクターモデルをベースに組まれたアプリケーションは規模が大きくなっても並行処理をしているコードが読みやすい。 共有メモリやロックを使う場合だと、平行処理できるパートの量と可読性がトレードオフになりがちな気がするのとは対照的な感じがしている。

上記の理由としては、それぞれの手法のアプローチが次のように根本的に異なるためだと思う。

  • 共有メモリやロックを使うやり方
    • 考え方のベースはシングルスレッド
    • ボトルネックになっている末端の処理を局所的に並行にしていく
  • アクターモデル
    • 考え方のベースはマルチスレッド
    • 末端の処理はシングルスレッドで処理される
      • ただし、アクターを横に並べることで並行度を上げられる

今回は、そんなアクターモデルを Python で実装するためのフレームワークである Pykka を使ってみることにする。 ざっと調べた感じ、この Pykka が Python で汎用的なアクターモデルを実装するためのパッケージとしては最も有名そうだった。 Pykka は Scala/Java でアクターモデルを実装するためのフレームワークである Akka の影響を受けている。 使われている用語の多くも Akka から取られているらしいけど、まあ使う分には特に意識する必要はない。

ただし、Pykka は単純に Akka を Python で再実装したものではない。 次のコンポーネントは明示的にサポートしないことが宣言されている。 もし欲しければ自分で作り込まなきゃいけない。

  • スーパーバイザ
    • セルフヒーリングとかを実現するやつ
  • メッセージルータ
    • 並行処理を書きやすくしたりアプリケーションの骨子を作るためのやつ
  • ネットワーク越しの分散処理
    • 並行処理のタスクをネットワーク越しの各ホストに分配するやつ

今回使った環境は次の通り。

$ sw_vers
ProductName:    Mac OS X
ProductVersion: 10.12.3
BuildVersion:   16D32
$ python --version
Python 3.6.0

インストール

インストールは Python のパッケージマネージャである pip を使ってさくっとできる。

$ pip install pykka

もし pip が入っていないときはインストールしよう。

$ curl https://bootstrap.pypa.io/get-pip.py | sudo python

あと、システムの実行環境を汚さないように Python 仮想環境を使えるようにしておくのがおすすめ。 これには例えば virtualenv なんかを使う。

$ sudo pip install virtualenv

そこらへんはお好みで。

はじめてのアクターモデル

アクターモデルでは、その名の通りアクターというコンポーネントを組み合わせてアプリケーションを構築していく。

アクターという概念を構成する要素は、実のところたったの三つしかない。 まず、他のアクターから何らかのデータを受け取るために用意されたメッセージキューが一つ目。 そして、その受け取ったデータを渡す先となるメッセージハンドラが二つ目。 最後に、メッセージキューからデータを取り出して、それをメッセージハンドラにディスパッチする役目のスレッドが三つ目だ。 この三つの要素が、それぞれのアクターインスタンスに必ず備わっている。 メッセージキューとスレッドはデフォルトで一つずつ、メッセージハンドラについては自分で何をするか記述する。

重要な点としては、アクター同士はメモリなどを共有することがない。 することがないというよりも、する必要がない・するとアクターモデルを使う意味が薄れるという方が正しい。 アクターモデルでは、何かを共有することなくお互いのメッセージキューにメッセージを放り込み合うことで情報をやり取りして処理を進めていく。 いわゆるメッセージパッシングと呼ばれるものだ。

また、アクターの中と外の世界は非同期の壁で断絶されている。 アクターの中身を、その外側から同期的に直接触ることはできない。 触ることができてしまうと、処理の競合が起こってしまうからだ。 アクターの中を触るには、いわゆる Future を必ず介すことになる。 処理はキューイングされていつかは実行されるけど、具体的にそれがいつかは分からない状態でアクセスする。

説明が長くなったけど、そろそろ具体的なサンプルコードを見ていくことにしよう。 以下は Pykka を使って作る、最も単純なアプリケーションの一つだ。 このアプリケーションではメッセージを受け取って、それを出力するだけのアクターを定義している。 説明はコードの中にコメントとして記述している。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import pykka


class GreetingActor(pykka.ThreadingActor):
    """メッセージを受け取って挨拶をするアクター"""

    def on_receive(self, message):
        """メッセージが届いたときに実行されるハンドラ"""
        # メッセージの中から名前を取り出す
        name = message['name']
        # 内容をプリントする
        print('Hello, {0}!'.format(name))


def main():
    # アクターを起動する
    actor_ref = GreetingActor.start()

    # アクターにメッセージを送る (一方向)
    actor_ref.tell({'name': 'World'})

    # アクターを停止する (本当はちょっと待った方が良いかも)
    actor_ref.stop()


if __name__ == '__main__':
    main()

アクターは pykka.ThreadingActor というクラスを継承して作る。 on_receive() というメソッドはアクターがメッセージを受け取って、それがディスパッチされたときに発火するハンドラだ。 サンプルコードでは、そのアクターを起動してメッセージを送りつけている。 アクターは start() メソッドが呼ばれた瞬間からメッセージキューを読み出すスレッドが起動して動き出す。 あとは、アクターに tell() メソッドでメッセージを送ることができる。

また、ポイントとしてはアクターを起動したときに返ってくる内容が ActorRef というアクターへの間接的な参照になっている点だ。 つまり、起動したアクターそのものを直接触ることはできず、必ずこの参照を経由してアクセスすることになる。 直接触ることを防ぐことによって、アクターの中で競合を起こさないようにしているのだろう。

ちなみに、上記のサンプルコードではメッセージを送りつけた直後にアクターを停止しているけど、本当はちょっと待った方がお行儀が良いのかもしれない。 ちょっと待つというのは、具体的にはメインスレッドをスリープさせるということだ。 何故なら、キューからのメッセージの取り出しおよびハンドラの発火は別スレッドで進むため、すぐに実行される保証がないから。 とはいえ、他にやっていることもないからすぐに処理されるはずで、現実的には問題ないんだけど。

上記のサンプルコードを実行すると、次のようになる。

$ python hellopykka.py
Hello, World!

ただのハローワールドだけど、これはアクターモデルで実装されている!

アクターモデルのコンストラクタに引数を渡す

ここからは、アクターモデルというより Pykka の使い方の説明に入っていく。

先ほどのサンプルコードでは、アクターにメッセージの形で引数を渡した。 とはいえ、起動時の初期パラメータを設定したい場合も多いと思う。 そんなときはコンストラクタに引数を用意すれば良い。 親クラスのコンストラクタは必ず呼ぶこと。 というより、呼ばないと例外になる。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import pykka


class GreetingActor(pykka.ThreadingActor):

    def __init__(self, greeting='Hello'):
        # 親クラスのコンストラクタは必ず呼ぶ
        super().__init__()

        # 挨拶の内容をカスタマイズできるようにする
        self.greeting = greeting

    def on_receive(self, message):
        name = message['name']
        print('{0}, {1}!'.format(self.greeting, name))


def main():
    # アクターを起動するときに使ったパラメータがコンストラクタに渡される
    actor_ref = GreetingActor.start('Hajimemashite')

    actor_ref.tell({'name': 'Sekai'})

    actor_ref.stop()


if __name__ == '__main__':
    main()

コンストラクタに渡す引数はアクターの起動時に渡す。

上記のサンプルコードを実行した結果は次の通り。

$ python initpykka.py
Hajimemashite, Sekai!

コンストラクタで渡したメッセージが使われている。

返り値のあるメッセージパッシング

これまでの例では、アクターにメッセージを送っておしまいという、一方的なものだった。 アクターに対して一方的にメッセージを送りつけるには tell() メソッドを使う。 とはいえ、何かメッセージを渡して、その結果として返り値を戻してほしいというのはよくある。

そんなときはメッセージを送りつけるときに ask() メソッドを使う。 また、メッセージハンドラでは返り値を戻すように作る。 返り値を受け取る側では、いくつか受け取り方を選ぶことができる。 次のサンプルコードでは、戻り値のあるパターンを実装している。 そして、返り値の受け取り側でもいくつかのやり方を試している。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import pykka


class GreetingActor(pykka.ThreadingActor):

    def on_receive(self, message):
        name = message['name']
        # 返り値があると ask() メソッドでその値が得られるようになる
        return 'Hello, {}!'.format(name)


def main():
    actor_ref = GreetingActor.start()

    # ActorRef#ask() を使うとアクターから返り値が得られる (返事があるまでブロックする)
    answer = actor_ref.ask({'name': 'World'})
    print(answer)

    # ブロックする時間にタイムアウトを設定したいときは timeout を指定する
    answer = actor_ref.ask({'name': 'Timeout'}, timeout=1)
    print(answer)

    # 処理をブロックさせたくないときは block=False を指定すると
    # 返り値が Future オブジェクトになる
    future = actor_ref.ask({'name': 'Future'}, block=False)
    # Future オブジェクトから結果を取り出す (ここでブロックする)
    answer = future.get()
    print(answer)

    future = actor_ref.ask({'name': 'Future-Timeout'}, block=False)
    # Future から結果を取り出すときのタイムアウトも指定できる
    answer = future.get(timeout=1)
    print(answer)

    actor_ref.stop()


if __name__ == '__main__':
    main()

上記で返り値を受け取るやり方がいくつかあることについて補足しておく。 前述した通り、アクターの内外は非同期の壁で断絶されている。 つまり、メッセージに対する返事が返ってくるタイミングは、将来であることは分かるものの具体的にいつかは分からないのだ。

そのため、返事の受け取り方として「返ってくるまですぐにブロックする」か「とりあえず受理された旨の返事だけ受け取る」やり方を選べるわけだ。 後者では、いわゆる Future を受け取っている。

先ほどのサンプルコードを実行した結果は次の通り。

$ python askpykka.py
Hello, World!
Hello, Timeout!
Hello, Future!
Hello, Future-Timeout!

アクターの中で発生した例外を処理する

もし、アクターの返り値のある処理で例外が発生したことに気づくことができないと大変だ。 Pykka では ActorRef#ask() メソッド経由で呼び出された処理の中で例外が起こったときは、呼び出し元にそれが伝搬するようになっている。

次のサンプルコードではメッセージハンドラの中で意図的に例外を上げている。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import pykka


class ExceptionActor(pykka.ThreadingActor):

    def on_receive(self, message):
        raise Exception('Oops!')


def main():
    actor_ref = ExceptionActor.start()

    try:
        # ちなみに ActorRef.tell() を使うときは、投げっぱなしなので
        # 呼び出し先で例外が起こっても呼び出し元でキャッチできない
        actor_ref.ask({'name': 'World'})
    except Exception as e:
        print(e)

    actor_ref.stop()


if __name__ == '__main__':
    main()

上記のサンプルコードを実行した結果は次の通り。

$ python trypykka.py
Oops!

親スレッドが死んだときにアクターのスレッドも停止させたい

ちなみに、先ほどのサンプルコードで例外をキャッチしないでいるとどうなるか試してみただろうか。 実はアプリケーションの実行はそのまま継続することになる。 どうしてそんなことが起こるかというと、子スレッドがデーモンスレッドになっていないためだ。 アプリケーションは全てのスレッドが停止するまで処理を継続することになる。

もし、親スレッドが停止したときに子スレッドも停止させたい、つまり一家心中を図りたいときは次のようにする。 アクターには use_daemon_thread というアトリビュートが用意されているので、そのフラグを True にしよう。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import pykka


class ExceptionActor(pykka.ThreadingActor):
    # 親のスレッドが終了したら、同時に停止させたいときはこうする
    # 親が死んだとき一家心中になるスレッドをデーモンスレッドという
    use_daemon_thread = True

    def on_receive(self, message):
        raise Exception('Oops!')


def main():
    actor_ref = ExceptionActor.start()

    # デーモンスレッドになっていないと、ここで例外になって親スレッドが死ぬ
    actor_ref.ask({'name': 'World'})

    # すると、このコードが実行されないのでアクターのスレッドが停止しない
    actor_ref.stop()


if __name__ == '__main__':
    main()

こうすれば、例外を取りこぼしたことでメインスレッドが停止したときに、そこから起動されたアクターのスレッドも同時に停止する。

$ python daemonpykka.py
Traceback (most recent call last):
  File "daemonpykka.py", line 27, in <module>
    main()
  File "daemonpykka.py", line 20, in main
    actor_ref.ask({'name': 'World'})
  File "/Users/amedama/.virtualenvs/py36/lib/python3.6/site-packages/pykka/actor.py", line 435, in ask
    return future.get(timeout=timeout)
  File "/Users/amedama/.virtualenvs/py36/lib/python3.6/site-packages/pykka/threading.py", line 52, in get
    compat.reraise(*self._data['exc_info'])
  File "/Users/amedama/.virtualenvs/py36/lib/python3.6/site-packages/pykka/compat.py", line 24, in reraise
    raise value
  File "/Users/amedama/.virtualenvs/py36/lib/python3.6/site-packages/pykka/actor.py", line 201, in _actor_loop
    response = self._handle_receive(message)
  File "/Users/amedama/.virtualenvs/py36/lib/python3.6/site-packages/pykka/actor.py", line 304, in _handle_receive
    return self.on_receive(message)
  File "daemonpykka.py", line 13, in on_receive
    raise Exception('Oops!')
Exception: Oops!

アクターのフラグを切り替えて、挙動の違いを確認してみよう。

アクタープロキシを使って処理を呼び出す

これまでの例では pykka.ThreadingActor の持つ on_receive() というメソッドをオーバーライドすることでアクターを実装してきた。 しかし、このやり方はだいぶプリミティブで Pykka の低レイヤーな API を使っている。 実際には、アクタープロキシという機構を使って、アクターのメソッドをあたかも直接呼び出しているような風に処理できる。

次のサンプルコードではアクターで独自のメソッド greeting を定義している。 その呼び出し側では ActorRef から ActorProxy を生成して、あたかもメソッドを直接呼び出しているように扱っている。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import pykka


class GreetingActor(pykka.ThreadingActor):

    def greeting(self):
        print('Hello, World!')


def main():
    actor_ref = GreetingActor.start()

    # ActorProxy を取得する
    actor_proxy = actor_ref.proxy()

    # ActorProxy では定義されているメソッドを直接呼べる (ように見える)
    actor_proxy.greeting()

    actor_ref.stop()


if __name__ == '__main__':
    main()

上記のサンプルコードを実行した結果は次の通り。

$ python proxypykka.py
Hello, World!

アクタープロキシを使って返り値のある場合

先ほどのサンプルコードでは、アクタープロキシを使って返り値のない場合を扱っていた。 今度は返り値のあるときを試してみる。 アクタープロキシを使って返り値を受け取るときは、実は必ず Future を介すことになる。

次のサンプルコードではアクタープロキシを使った上で返り値を受け取っている。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import pykka


class GreetingActor(pykka.ThreadingActor):

    def greeting(self):
        return 'Hello, World!'


def main():
    actor_ref = GreetingActor.start()
    actor_proxy = actor_ref.proxy()

    # 返り値のある呼び出しでは Future オブジェクトが返る
    future = actor_proxy.greeting()
    answer = future.get()
    print(answer)

    actor_ref.stop()


if __name__ == '__main__':
    main()

アクタープロキシ経由で実行したメソッドは、必ず Future 経由で返り値を受け取ることになる。

上記の実行結果は次の通り。

$ python futurepykka.py
Hello, World!

アクターのアトリビュートにアクタープロキシ経由でアクセスする

前述した通り、アクターの中と外は非同期の壁で断絶されていることから、直接同期的に触ることはできない。 それをすると、アクターの処理が競合する恐れがあるためだ。

では、アクターのアトリビュートを触るのにも、必ずアクセサに相当するメソッドを定義する必要があるのだろうか? だとすると相当に面倒くさいと思うんだけど、これは定義する必要がない。 アクタープロキシ経由であればアトリビュートも Future 経由にはなるが、触ることができる。

次のサンプルコードではアクターに message というアトリビュートを定義している。 メインスレッドからは、それをアクタープロキシ経由で読み出している。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import pykka


class MyActor(pykka.ThreadingActor):

    def __init__(self):
        super().__init__()
        # アクターでないオブジェクトをアトリビュートとして持つ
        self.message = 'Hello, World!'


def main():
    actor_ref = MyActor.start()
    actor_proxy = actor_ref.proxy()

    # アクターでないアトリビュートにアクセスすると Future が返る
    future = actor_proxy.message
    # 値を取り出す
    answer = future.get()
    print(answer)

    actor_ref.stop()


if __name__ == '__main__':
    main()

ポイントとしては、アトリビュートへのアクセスであっても Future が返る点だ。 こうすればアクセスが非同期に処理されるため、処理が競合することはない。

上記の処理結果は次の通り。

$ python attrpykka.py
Hello, World!

アクターのメンバが持つメソッドをアクタープロキシ経由で呼び出す

先ほどのサンプルコードではアクターのメンバがただの文字列だった。 次は、これがメソッドを持ったインスタンスだったときの話。

次のサンプルコードでは、アクター MyActor がアクターでないアトリビュート NonActor のインスタンスをメンバに持っている。 それを、アクタープロキシ経由で外から呼び出している。 この場合、メンバとなるアクターでないオブジェクトには pykka_traversable というフラグが必要になる。 このフラグを立てることで、アクタープロキシから呼び出すことができるようになる。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import pykka


class NonActor(object):
    """アクターのアトリビュートになる非アクタークラス"""
    # pykka_traversable アトリビュートを True にするとプロキシ経由でアクセスできるようになる
    pykka_traversable = True

    def greeting(self):
        """プロキシ経由で外部から呼びたいメソッド"""
        return 'Hello, World!'


class MyActor(pykka.ThreadingActor):

    def __init__(self):
        super().__init__()
        # アクターでないオブジェクトをアトリビュートとして持つ
        self.non_actor = NonActor()


def main():
    actor_ref = MyActor.start()
    actor_proxy = actor_ref.proxy()

    # アクターのアトリビュートのオブジェクトが持つメソッドを呼び出せるようになる
    future = actor_proxy.non_actor.greeting()
    # Future が返ってくるので値を取り出す
    answer = future.get()
    print(answer)

    actor_ref.stop()


if __name__ == '__main__':
    main()

このパターンでも返り値は Future になる。

上記のサンプルコードの実行結果は次の通り。

$ python methodpykka.py
Hello, World!

アクターモデルを使った並行処理

これまでのサンプルコードでは、アクターは定義しているものの平行処理をしていなかった。 なので、説明の都合上、仕方なかったとはいえ一体何のためにこんなことを、という印象だったかもしれない。 ここからは、ようやくアクターモデルを使って並行処理をしてみる。

アクターモデルの並行処理では、基本的にアクターをたくさん用意してタスクをそれぞれのアクターに振り分けていく。 アクターには、それぞれメッセージを処理するスレッドが起動しているので、処理をバラバラに並列で進めることができるわけ。

次のサンプルコードでは、 アクター MyActor に擬似的に処理に時間のかかるメソッド process() を用意している。 時間がかかる、というところはスレッドをスリープすることで表現した。 ここでは、時間のかかる処理を 4 回呼び出している。 それぞれの処理には 1 秒かかる。 一つのアクターで直列に処理すれば、単純に 4 秒かかるはずの処理を、ここでは二つのアクターに振り分けている。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from __future__ import division

import time

import pykka


class MyActor(pykka.ThreadingActor):

    def process(self, processing_id, sleep_sec):
        """時間がかかる処理を模したメソッド"""
        # time.sleep() を使って時間がかかる処理をエミュレートする
        time.sleep(sleep_sec)
        # 処理が終わったら内容を出力する
        print('Completed: ID {0} in {1} s'.format(processing_id, sleep_sec))


def main():
    # アクターを二つ起動する
    actor_proxy_a = MyActor.start().proxy()
    actor_proxy_b = MyActor.start().proxy()

    # 処理を開始した時刻を記録しておく
    start_time = time.time()

    # それぞれのアクターに仕事を二つずつ割り当てる
    # 処理はアクターのスレッドがキューから取り出して開始する
    future1 = actor_proxy_a.process(1, 1)
    future2 = actor_proxy_a.process(2, 1)

    future3 = actor_proxy_b.process(3, 1)
    future4 = actor_proxy_b.process(4, 1)

    # 計算が終わるのを待つ (threading#join() のようなものと考えると分かりやすい)
    future1.get()
    future2.get()
    future3.get()
    future4.get()

    # 処理が終わったら時刻を記録する
    end_time = time.time()

    # 一連の処理にかかった時間を出力する
    elapsed_time = end_time - start_time
    print('Elapsed Time: {0} s'.format(elapsed_time))

    # すべてのアクターを停止させる
    pykka.ActorRegistry.stop_all()


if __name__ == '__main__':
    main()

さて、処理が終わるのに実際は何秒かかるだろうか?

上記のサンプルコードの実行結果は次の通り。 二つのアクターで処理した結果として本来一つのアクターなら 4 秒かかるところ 2 秒で完了している。

$ python cuncurrentpykka.py
Completed: ID 3 in 1 s
Completed: ID 1 in 1 s
Completed: ID 2 in 1 s
Completed: ID 4 in 1 s
Elapsed Time: 2.0071158409118652 s

アクターモデルでも設計によっては競合が起こる

アクターモデルを使えば、競合が起きないというような説明をされることがたまにあるようだ。 しかし、その「競合が起きない」というのは、より低レイヤーな側面からアクターモデルを見たときの話だと思う。 あくまで、アクターのメッセージキューを処理するスレッドが一つだけなので、アクターの中で処理が競合しないというのに過ぎない。 より高レイヤーに、アプリケーションという側面からアクターモデルを見たとき、競合は容易に起こる。 それぞれの競合は全く別の次元の話なんだけど、もし混ぜて考えてしまうと危険だ。 次は、その競合について見てみよう。

サンプルコードでは、銀行口座を模したモデルを用意した。 AccountActor という銀行口座を模したアクターを UserActor というユーザを模したアクターが操作する。 UserActor はまず銀行口座から預金残高を読み取って、その値を元に預金残高を更新する。 ここでポイントなのは AccountActor の API がアトミックになっていないという点だ。 つまり、銀行口座の読み取りから更新までの間に別のアクターからの処理が割り込む余地がある。 サンプルコードでは、口座の読み取りから更新までの間に意図的にスリープを入れることで競合が起こりやすいようにしている。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import time

import pykka


class AccountActor(pykka.ThreadingActor):
    """銀行口座を模したアクター"""

    def __init__(self, initial_money=0):
        super(AccountActor, self).__init__()

        self._money = initial_money

    def get_balance(self):
        # 口座の残高を得る
        return self._money

    def set_balance(self, money):
        # 口座の残高を設定する
        self._money = money


class UserActor(pykka.ThreadingActor):
    """銀行口座を使うユーザを模したアクター"""

    def __init__(self, username, account_actor):
        super(UserActor, self).__init__()

        self._username = username
        self._account_actor = account_actor

    def save(self, amount):
        # 残高を読み出す
        future = self._account_actor.get_balance()
        balance = future.get()
        print('{0} read balance: {1}'.format(self._username, balance))
        # 残高を読んでから次の処理に移るのに時間がかかった!
        time.sleep(1)
        print('{0} sleep...'.format(self._username))
        # 口座残高を更新する
        new_balance = balance + amount
        future = self._account_actor.set_balance(new_balance)
        future.get()
        print('{0} set balance: {1}'.format(self._username, new_balance))


def main():
    # アクターを起動する
    account_actor_ref = AccountActor.start(100)
    account_actor_proxy = account_actor_ref.proxy()

    user_a_ref = UserActor.start('UserA', account_actor_proxy)
    user_a_proxy = user_a_ref.proxy()

    user_b_ref = UserActor.start('UserB', account_actor_proxy)
    user_b_proxy = user_b_ref.proxy()

    # ユーザ A と B が同時に口座に 50 を入金する
    future_a = user_a_proxy.save(50)
    future_b = user_b_proxy.save(50)

    future_a.get()
    future_b.get()

    # 最終的な口座の残高を出力する
    print('Final Balance: {0}'.format(account_actor_proxy.get_balance().get()))

    pykka.ActorRegistry.stop_all()


if __name__ == '__main__':
    main()

二つのユーザを模したアクターは銀行残高をそれぞれ 50 だけ増やすべく処理を実施する。 初期の預金残高は 100 なので、最終的には 200 になっていないとおかしい。

では、上記のサンプルコードを実行してみよう。

$ python conflictpykka.py
UserA read balance: 100
UserB read balance: 100
UserA sleep...
UserB sleep...
UserA set balance: 150
UserB set balance: 150
Final Balance: 150

最終的な口座残高は 150 となってしまった。 見事に競合が発生している。

問題は、アクターの API が更新に対してアトミックになっていなかった点だ。 次の改良したサンプルコードでは、預金残高を更新するメソッドを update_balance() という形でアトミックにしている。 現在の預金残高を元に、口座のアクター自体が更新をかけるので、間に別のアクターの処理が入り込む余地はない。 アクターの中はメッセージキューを処理する一つのスレッドで処理されているので、同時に同じメソッドが走る心配はない。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import pykka


class AccountActor(pykka.ThreadingActor):

    def __init__(self, initial_money=0):
        super(AccountActor, self).__init__()

        self._money = initial_money

    def get_balance(self):
        return self._money

    def update_balance(self, money):
        # アトミックに口座残高を更新する
        self._money = self._money + money


class UserActor(pykka.ThreadingActor):

    def __init__(self, username, account_actor):
        super(UserActor, self).__init__()

        self._username = username
        self._account_actor = account_actor

    def save(self, amount):
        # 口座残高を更新する
        future = self._account_actor.update_balance(amount)
        future.get()
        print('{0} update balance: {1}'.format(self._username, amount))


def main():
    # アクターを起動する
    account_actor_ref = AccountActor.start(100)
    account_actor_proxy = account_actor_ref.proxy()

    user_a_ref = UserActor.start('UserA', account_actor_proxy)
    user_a_proxy = user_a_ref.proxy()

    user_b_ref = UserActor.start('UserB', account_actor_proxy)
    user_b_proxy = user_b_ref.proxy()

    # ユーザ A と B が同時に口座に 50 を入金する
    future_a = user_a_proxy.save(50)
    future_b = user_b_proxy.save(50)

    future_a.get()
    future_b.get()

    # 最終的な口座の残高を出力する
    print('Final Balance: {0}'.format(account_actor_proxy.get_balance().get()))

    pykka.ActorRegistry.stop_all()


if __name__ == '__main__':
    main()

上記のサンプルコードを実行した結果は次の通り。 今度はちゃんと預金残高が 200 に更新されている。

$ python conflictpykka2.py
UserA update balance: 50
UserB update balance: 50
Final Balance: 200

アクターモデルでも設計によってはデッドロックが起こる

典型的なデッドロックは、マルチスレッドでリソースをロックする順番が不定になっていたときに起こる。 アクターモデルはロックを使わないので、それならデッドロックが起こらないかというと、そんなことはない。

次のサンプルコードではデッドロックを発生させている。 ActorAActorB のメソッドを呼んで、ActorBActorA のメソッドを呼ぶ、というように操作が交差している。 それぞれのメソッドは各アクターのスレッドで非同期に処理されるので、返り値で得られる Future を get() すると完了するまでブロックする。 すなわち、双方がお互いの処理の完了を待ってデッドロックを起こしてしまう。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import logging
import signal
import os

import pykka
from pykka import debug


class ActorA(pykka.ThreadingActor):

    def method_a(self, actor_b_proxy):
        # 2. ActorB の method_b を呼ぶ
        future = actor_b_proxy.method_b()
        # 3. Future の中身を取り出そうとすると ActorA のスレッドは取り出せるまでブロックする
        future.get()


class ActorB(pykka.ThreadingActor):

    def __init__(self, actor_a_proxy):
        super(ActorB, self).__init__()
        self.actor_a_proxy = actor_a_proxy

    def method_b(self):
        # 4. ActorA の method_a を呼ぶ (本当は自身のプロキシを渡す必要があるけど)
        future = self.actor_a_proxy.method_a()
        # 5. Future の中身を取り出そうとすると ActorB のスレッドは取り出せるまでブロックする
        future.get()
        # XXX: ActorA のスレッドは既にブロックしているのでデッドロックを起こす


def main():
    """意図的にデッドロックを起こしてみる"""

    # デバッグレベルのログが出力されるようにする
    logging.basicConfig(level=logging.DEBUG)

    # シグナル SIGUSR1 でスレッドのトレースバックを出力する
    signal.signal(signal.SIGUSR1, debug.log_thread_tracebacks)

    # アクターを起動する
    actor_a_ref = ActorA.start()
    actor_a_proxy = actor_a_ref.proxy()
    # ActorB には ActorA のプロキシを渡しておく
    actor_b_ref = ActorB.start(actor_a_proxy)
    actor_b_proxy = actor_b_ref.proxy()

    # 1. ActorA の method_a を呼ぶ (引数として ActorB を渡す)
    actor_a_proxy.method_a(actor_b_proxy)

    # メインスレッドでは PID を出力する
    pid = os.getpid()
    print(f'PID: {pid}')

    # アクターがデーモンスレッドではないので、ここまできてもプログラムは終了しない


if __name__ == '__main__':
    main()

上記を実行すると、何も表示されずに処理が止まる。

$ python deadlockpykka.py
DEBUG:pykka:Registered ActorA (urn:uuid:5b470837-65b9-408d-9843-23982f2efcc5)
DEBUG:pykka:Starting ActorA (urn:uuid:5b470837-65b9-408d-9843-23982f2efcc5)
DEBUG:pykka:Registered ActorB (urn:uuid:02ab87fd-705b-4ec4-9718-78870b99eb90)
DEBUG:pykka:Starting ActorB (urn:uuid:02ab87fd-705b-4ec4-9718-78870b99eb90)
PID: 9199

サンプルコードでプロセス ID を表示するようにしているところに気づいただろうか? また Pykka の debug.log_thread_tracebacks という関数がシグナル SIGUSR1 に対して登録されている。 そこで、別のターミナルからこのハンドラに対してシグナルを送ってみよう。

$ kill -SIGUSR1 9199

すると、今のスレッドが何処を処理しているのかトレースバックが表示される。 この表示からは、二つのアクターのスレッドが future.get() でブロックしていることが分かる。

$ python deadlockpykka.py
DEBUG:pykka:Registered ActorA (urn:uuid:5b470837-65b9-408d-9843-23982f2efcc5)
DEBUG:pykka:Starting ActorA (urn:uuid:5b470837-65b9-408d-9843-23982f2efcc5)
DEBUG:pykka:Registered ActorB (urn:uuid:02ab87fd-705b-4ec4-9718-78870b99eb90)
DEBUG:pykka:Starting ActorB (urn:uuid:02ab87fd-705b-4ec4-9718-78870b99eb90)
PID: 9199
CRITICAL:pykka:Current state of ActorB-2 (ident: 123145451573248):
  File "/Users/amedama/.pyenv/versions/3.6.0/lib/python3.6/threading.py", line 884, in _bootstrap
    self._bootstrap_inner()
  File "/Users/amedama/.pyenv/versions/3.6.0/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/Users/amedama/.pyenv/versions/3.6.0/lib/python3.6/threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/amedama/.virtualenvs/py36/lib/python3.6/site-packages/pykka/actor.py", line 201, in _actor_loop
    response = self._handle_receive(message)
  File "/Users/amedama/.virtualenvs/py36/lib/python3.6/site-packages/pykka/actor.py", line 295, in _handle_receive
    return callee(*message['args'], **message['kwargs'])
  File "deadlockpykka.py", line 31, in method_b
    future.get()
  File "/Users/amedama/.virtualenvs/py36/lib/python3.6/site-packages/pykka/threading.py", line 50, in get
    self._data = self._queue.get(True, timeout)
  File "/Users/amedama/.pyenv/versions/3.6.0/lib/python3.6/queue.py", line 164, in get
    self.not_empty.wait()
  File "/Users/amedama/.pyenv/versions/3.6.0/lib/python3.6/threading.py", line 295, in wait
    waiter.acquire()

CRITICAL:pykka:Current state of ActorA-1 (ident: 123145446318080):
  File "/Users/amedama/.pyenv/versions/3.6.0/lib/python3.6/threading.py", line 884, in _bootstrap
    self._bootstrap_inner()
  File "/Users/amedama/.pyenv/versions/3.6.0/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/Users/amedama/.pyenv/versions/3.6.0/lib/python3.6/threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/amedama/.virtualenvs/py36/lib/python3.6/site-packages/pykka/actor.py", line 201, in _actor_loop
    response = self._handle_receive(message)
  File "/Users/amedama/.virtualenvs/py36/lib/python3.6/site-packages/pykka/actor.py", line 295, in _handle_receive
    return callee(*message['args'], **message['kwargs'])
  File "deadlockpykka.py", line 18, in method_a
    future.get()
  File "/Users/amedama/.virtualenvs/py36/lib/python3.6/site-packages/pykka/threading.py", line 50, in get
    self._data = self._queue.get(True, timeout)
  File "/Users/amedama/.pyenv/versions/3.6.0/lib/python3.6/queue.py", line 164, in get
    self.not_empty.wait()
  File "/Users/amedama/.pyenv/versions/3.6.0/lib/python3.6/threading.py", line 295, in wait
    waiter.acquire()

CRITICAL:pykka:Current state of MainThread (ident: 140736596534208):
  File "/Users/amedama/.pyenv/versions/3.6.0/lib/python3.6/threading.py", line 1290, in _shutdown
    t.join()
  File "/Users/amedama/.pyenv/versions/3.6.0/lib/python3.6/threading.py", line 1056, in join
    self._wait_for_tstate_lock()
  File "/Users/amedama/.pyenv/versions/3.6.0/lib/python3.6/threading.py", line 1072, in _wait_for_tstate_lock
    elif lock.acquire(block, timeout):
  File "/Users/amedama/.virtualenvs/py36/lib/python3.6/site-packages/pykka/debug.py", line 68, in log_thread_tracebacks
    stack = ''.join(traceback.format_stack(frame))

Pykka の並行処理に関しては、上記のようにしてデバッグができる。

スレッドプールっぽいものを作って並行処理をしてみる

次はもうちょっと実践的に、スレッドプールっぽく並行処理をしてみよう。 とはいえ、あまり汎用的には作らずベタ書きだけど。

次のサンプルコードでは、先ほどの例と同じように MyActor に処理に時間のかかるメソッド process() を用意している。 アクターを配列で 10 個生成した上で、タスクを 20 回呼び出してみよう。 ただし、通常のスレッドプールのような形で終わったものから新しいタスクを割り当てる、というものではない。 あらかじめ、各アクターに決まった番号のタスクを割り当てるような形になっている。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from __future__ import division

import time
import random

import pykka


class MyActor(pykka.ThreadingActor):

    def process(self, processing_id):
        """時間がかかる処理を模したメソッド"""
        # 処理時間をランダムに用意する
        sleep_sec = random.randint(1, 10) / 10
        # time.sleep() を使って時間がかかる処理をエミュレートする
        time.sleep(sleep_sec)
        # 処理が終わったら内容を出力する
        print('Completed: ID {0} in {1} s'.format(processing_id, sleep_sec))
        # 処理の ID を返す
        return processing_id


def main():
    # スレッドプールのようにアクターを用意する
    actor_proxy_pool = [MyActor.start().proxy() for _ in range(10)]

    # それぞれのアクターに仕事を割り当てていく
    # かかる時間はランダムにする
    futures = [
        actor_proxy_pool[i % len(actor_proxy_pool)].process(i)
        for i in range(20)
    ]

    # すべての処理が完了するのを待ち合わせる
    answers = pykka.get_all(futures)

    print('*** Calculation Completed ***')

    # 完了した結果を出力する
    for answer in answers:
        print('ID: {0}'.format(answer))

    # すべてのアクターを停止させる
    pykka.ActorRegistry.stop_all()


if __name__ == '__main__':
    main()

実行結果は次の通り。 各タスクが順番バラバラで実行されているのが分かる。

$ python tpoolpykka.py
Completed: ID 3 in 0.1 s
Completed: ID 0 in 0.3 s
Completed: ID 7 in 0.3 s
Completed: ID 13 in 0.2 s
Completed: ID 1 in 0.5 s
Completed: ID 4 in 0.5 s
Completed: ID 5 in 0.6 s
Completed: ID 2 in 0.7 s
Completed: ID 8 in 0.7 s
Completed: ID 10 in 0.4 s
Completed: ID 17 in 0.4 s
Completed: ID 6 in 0.8 s
Completed: ID 9 in 1.0 s
Completed: ID 15 in 0.5 s
Completed: ID 11 in 0.6 s
Completed: ID 12 in 0.5 s
Completed: ID 16 in 0.5 s
Completed: ID 14 in 0.9 s
Completed: ID 19 in 0.5 s
Completed: ID 18 in 0.9 s
*** Calculation Completed ***
ID: 0
ID: 1
ID: 2
ID: 3
ID: 4
ID: 5
ID: 6
ID: 7
ID: 8
ID: 9
ID: 10
ID: 11
ID: 12
ID: 13
ID: 14
ID: 15
ID: 16
ID: 17
ID: 18
ID: 19

出力の最後を見て分かる通り、実行結果として得られる配列の順番は変わっていない。 これは、単純に内部でやっているのが順番に Future#get() をしていく、という処理だからだ。 とはいえ、最終的に得られる結果の順番が入れ替わらないという特性は並行処理をする上で優れた特徴だと思う。 場合によっては終わったものから返ってきて、後からソートし直さないといけないことなんかもあるし。

ラウンドロビンで処理するメッセージルータを実装してみる

次は、お試しで作ってみたメッセージルータについて。 これはほんとに試しに作ってみただけなのでプルーフオブコンセプトと思ってほしい。

次のサンプルコードでは MyActor をラップするメッセージルータの RoundRobinRouter を定義している。 RoundRobinRouter では、内部的に MyActor のプロキシを複数生成して、処理を振り分ける。 先ほどのべた書きのスレッドプールを、もうちょっと洗練させた感じ。 RoundRobinRouter は、まず処理をフォワードしたいアクターと同時並行数 (アクターをいくつ生成するか) を指定して起動する。 その上で RoundRobinRouter#forward() メソッドに呼び出したいメソッド名と引数を指定するだけ。 注意点としては、返り値が二重の Future になっているところ。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from __future__ import division

import time
import random

import pykka


class MyActor(pykka.ThreadingActor):

    def process(self, processing_id):
        """時間がかかる処理を模したメソッド"""
        # 処理時間をランダムに用意する
        sleep_sec = random.randint(1, 10) / 10
        # time.sleep() を使って時間がかかる処理をエミュレートする
        time.sleep(sleep_sec)
        # 処理が終わったら内容を出力する
        print('Completed: ID {0} in {1}s by {2}'.format(
            processing_id,
            sleep_sec,
            self.actor_urn,
        ))
        # 処理の ID を返す
        return processing_id


class RoundRobinRouter(pykka.ThreadingActor):
    """ラップしたアクターにラウンドロビンで処理を割り当てていくメッセージルータ"""

    def __init__(self, actor_type, size):
        super(RoundRobinRouter, self).__init__()

        # アクターのプロキシを生成しておく
        self._proxy_pool = [actor_type.start().proxy()
                            for _ in range(size)]
        # 次に処理させるアクターを決めるジェネレータ
        self._index_gen = self._next_actor_index()

    def forward(self, name, *args, **kwargs):
        # 次の処理させるアクターのインデックス
        actor_index = next(self._index_gen)
        # 次の処理させるアクターのプロキシ
        actor_proxy = self._proxy_pool[actor_index]
        # フォワード対象のアトリビュートをプロキシから取り出す
        attribute = getattr(actor_proxy, name)
        # 引数を渡して呼び出す
        return attribute(*args, **kwargs)

    def _next_actor_index(self):
        """次の処理対象のアクターのインデックスを生成するジェネレータ"""
        i = 0
        while True:
            yield i
            # 0, 1, 2, 3 ... 0, 1, 2, 3 と繰り返す
            i = (i + 1) % len(self._proxy_pool)


def main():
    # 五つの MyActor をラップしたルータを生成する
    router_ref = RoundRobinRouter.start(MyActor, 5)
    router_proxy = router_ref.proxy()

    futures = [router_proxy.forward('process', i)
               for i in range(10)]

    # このルータを経由させると Future の二段重ねになる
    inner_futures = pykka.get_all(futures)
    answers = pykka.get_all(inner_futures)

    print('*** Calculation Completed ***')

    for answer in answers:
        print('ID: {0}'.format(answer))

    # すべてのアクターを停止させる
    pykka.ActorRegistry.stop_all()


if __name__ == '__main__':
    main()

上記の実行結果は次の通り。 タスクを処理したアクターの識別子も同時に出力するようになっている。 見ると、タスクの識別子が 5 つの周期で同じアクターによって処理されていることが分かる。 上手く動作しているようだ。

$ python routerpykka.py
Completed: ID 1 in 0.3s by urn:uuid:a5d3f2e6-6d59-4351-9be8-e47023428e99
Completed: ID 2 in 0.6s by urn:uuid:ee6f93cf-66df-4500-b416-84defff8b195
Completed: ID 3 in 0.6s by urn:uuid:c6003463-87fc-4988-b8bb-34186e867630
Completed: ID 0 in 0.7s by urn:uuid:9a2fe8bc-49fe-4b19-96d5-6658ecae3677
Completed: ID 7 in 0.1s by urn:uuid:ee6f93cf-66df-4500-b416-84defff8b195
Completed: ID 4 in 0.8s by urn:uuid:0e3b5668-6825-43f5-bd95-9cf2c706a03d
Completed: ID 6 in 0.5s by urn:uuid:a5d3f2e6-6d59-4351-9be8-e47023428e99
Completed: ID 9 in 0.3s by urn:uuid:0e3b5668-6825-43f5-bd95-9cf2c706a03d
Completed: ID 5 in 0.8s by urn:uuid:9a2fe8bc-49fe-4b19-96d5-6658ecae3677
Completed: ID 8 in 1.0s by urn:uuid:c6003463-87fc-4988-b8bb-34186e867630
*** Calculation Completed ***
ID: 0
ID: 1
ID: 2
ID: 3
ID: 4
ID: 5
ID: 6
ID: 7
ID: 8
ID: 9

まとめ

今回は Python でアクターモデルを実装するためのフレームワークである Pykka を試してみた。

アクターモデルというのは、並行処理のプログラミングモデルの一つ。 ただし、マルチスレッドやマルチプロセスよりも、抽象度の高い概念になっている。 対比するとしたら共有メモリやロック、RPC などがそれに当たると思う。

アクターモデルの概念を理解する上で重要なのは、アクターを構成する三つの要素を理解すること。 一つ目がメッセージキュー、二つ目がメッセージハンドラ、三つ目がそれらをディスパッチするスレッド。 生成されたアクターには、それぞれにこの三つの要素が必ず備わっている。 メッセージキューとスレッドはあらかじめアクターごとに一つずつ生成されるし、メッセージハンドラは自分で定義する。

アクターモデルでは、内部で競合を起こさないようにアクターの内外が非同期の壁で分断されている。 アクターの内部に、外側から同期的に直接触れることはできない。 必ず、触るとしたら Future を経由することになる。 同期的に触っているように見えたとしても、内部的には必ず経由している。

アクターモデルで並行処理を実現するには、アクターを横にたくさん並べてタスクを割り当てていく。 ただし、それぞれのタスクはアトミックに作られていないといけない。 アクターモデルで競合が起こらないというのは、あくまで「低レイヤーで内部的には」という但し書きがつく。 また、ロックがないからといって、デッドロックが起こらないというわけでもない。

Pykka の場合、Akka の機能を全て揃えているわけではないので、割りとよく使いそうな機能でも自分で書く必要がある。 また、正直アプリケーションで直接 Pykka を使うというシチュエーションは、あまり無いのかもしれないと思った。 なぜなら Pykka の提供する API は、直接使うにはあまりに低レイヤーすぎるからだ。 なので、Pykka をベースにした何らかのフレームワークをまず書いて、その上でアプリケーションを組むことになりそうな印象を受けた。

いじょう。