アクターモデルというのは、並行処理のプログラミングモデルの一つだ。
並行処理という言葉からは、まずマルチスレッドとかをイメージすると思うけど、それよりも抽象度の高い概念となっている。
つまり、アクターモデルというのはマルチスレッドなどを用いて構築することになる。
どちらかといえばプロセス間通信 (IPC) の技法であって、共有メモリやロック、RPC と比較するものかもしれない。
そんなアクターモデルは、概念とか使ったときの嬉しさを理解・実感するのがなかなか難しいモデルだとも思う。
理由としては、使い始めるまでに必要なコード量が多かったり、それなりの規模のアプリケーションで使わないとメリットが分かりづらい点が挙げられる。
ただ、これはあくまで主観的なものだけど、アクターモデルをベースに組まれたアプリケーションは規模が大きくなっても並行処理をしているコードが読みやすい。
共有メモリやロックを使う場合だと、平行処理できるパートの量と可読性がトレードオフになりがちな気がするのとは対照的な感じがしている。
上記の理由としては、それぞれの手法のアプローチが次のように根本的に異なるためだと思う。
- 共有メモリやロックを使うやり方
- 考え方のベースはシングルスレッド
- ボトルネックになっている末端の処理を局所的に並行にしていく
- アクターモデル
- 考え方のベースはマルチスレッド
- 末端の処理はシングルスレッドで処理される
- ただし、アクターを横に並べることで並行度を上げられる
今回は、そんなアクターモデルを Python で実装するためのフレームワークである Pykka を使ってみることにする。
ざっと調べた感じ、この Pykka が Python で汎用的なアクターモデルを実装するためのパッケージとしては最も有名そうだった。
Pykka は Scala/Java でアクターモデルを実装するためのフレームワークである Akka の影響を受けている。
使われている用語の多くも Akka から取られているらしいけど、まあ使う分には特に意識する必要はない。
ただし、Pykka は単純に Akka を Python で再実装したものではない。
次のコンポーネントは明示的にサポートしないことが宣言されている。
もし欲しければ自分で作り込まなきゃいけない。
- スーパーバイザ
- メッセージルータ
- 並行処理を書きやすくしたりアプリケーションの骨子を作るためのやつ
- ネットワーク越しの分散処理
- 並行処理のタスクをネットワーク越しの各ホストに分配するやつ
今回使った環境は次の通り。
$ sw_vers
ProductName: Mac OS X
ProductVersion: 10.12.3
BuildVersion: 16D32
$ python --version
Python 3.6.0
インストール
インストールは Python のパッケージマネージャである pip を使ってさくっとできる。
$ pip install pykka
もし pip が入っていないときはインストールしよう。
$ curl https://bootstrap.pypa.io/get-pip.py | sudo python
あと、システムの実行環境を汚さないように Python 仮想環境を使えるようにしておくのがおすすめ。
これには例えば virtualenv なんかを使う。
$ sudo pip install virtualenv
そこらへんはお好みで。
はじめてのアクターモデル
アクターモデルでは、その名の通りアクターというコンポーネントを組み合わせてアプリケーションを構築していく。
アクターという概念を構成する要素は、実のところたったの三つしかない。
まず、他のアクターから何らかのデータを受け取るために用意されたメッセージキューが一つ目。
そして、その受け取ったデータを渡す先となるメッセージハンドラが二つ目。
最後に、メッセージキューからデータを取り出して、それをメッセージハンドラにディスパッチする役目のスレッドが三つ目だ。
この三つの要素が、それぞれのアクターインスタンスに必ず備わっている。
メッセージキューとスレッドはデフォルトで一つずつ、メッセージハンドラについては自分で何をするか記述する。
重要な点としては、アクター同士はメモリなどを共有することがない。
することがないというよりも、する必要がない・するとアクターモデルを使う意味が薄れるという方が正しい。
アクターモデルでは、何かを共有することなくお互いのメッセージキューにメッセージを放り込み合うことで情報をやり取りして処理を進めていく。
いわゆるメッセージパッシングと呼ばれるものだ。
また、アクターの中と外の世界は非同期の壁で断絶されている。
アクターの中身を、その外側から同期的に直接触ることはできない。
触ることができてしまうと、処理の競合が起こってしまうからだ。
アクターの中を触るには、いわゆる Future を必ず介すことになる。
処理はキューイングされていつかは実行されるけど、具体的にそれがいつかは分からない状態でアクセスする。
説明が長くなったけど、そろそろ具体的なサンプルコードを見ていくことにしよう。
以下は Pykka を使って作る、最も単純なアプリケーションの一つだ。
このアプリケーションではメッセージを受け取って、それを出力するだけのアクターを定義している。
説明はコードの中にコメントとして記述している。
import pykka
class GreetingActor(pykka.ThreadingActor):
"""メッセージを受け取って挨拶をするアクター"""
def on_receive(self, message):
"""メッセージが届いたときに実行されるハンドラ"""
name = message['name']
print('Hello, {0}!'.format(name))
def main():
actor_ref = GreetingActor.start()
actor_ref.tell({'name': 'World'})
actor_ref.stop()
if __name__ == '__main__':
main()
アクターは pykka.ThreadingActor
というクラスを継承して作る。
on_receive()
というメソッドはアクターがメッセージを受け取って、それがディスパッチされたときに発火するハンドラだ。
サンプルコードでは、そのアクターを起動してメッセージを送りつけている。
アクターは start()
メソッドが呼ばれた瞬間からメッセージキューを読み出すスレッドが起動して動き出す。
あとは、アクターに tell()
メソッドでメッセージを送ることができる。
また、ポイントとしてはアクターを起動したときに返ってくる内容が ActorRef
というアクターへの間接的な参照になっている点だ。
つまり、起動したアクターそのものを直接触ることはできず、必ずこの参照を経由してアクセスすることになる。
直接触ることを防ぐことによって、アクターの中で競合を起こさないようにしているのだろう。
ちなみに、上記のサンプルコードではメッセージを送りつけた直後にアクターを停止しているけど、本当はちょっと待った方がお行儀が良いのかもしれない。
ちょっと待つというのは、具体的にはメインスレッドをスリープさせるということだ。
何故なら、キューからのメッセージの取り出しおよびハンドラの発火は別スレッドで進むため、すぐに実行される保証がないから。
とはいえ、他にやっていることもないからすぐに処理されるはずで、現実的には問題ないんだけど。
上記のサンプルコードを実行すると、次のようになる。
$ python hellopykka.py
Hello, World!
ただのハローワールドだけど、これはアクターモデルで実装されている!
アクターモデルのコンストラクタに引数を渡す
ここからは、アクターモデルというより Pykka の使い方の説明に入っていく。
先ほどのサンプルコードでは、アクターにメッセージの形で引数を渡した。
とはいえ、起動時の初期パラメータを設定したい場合も多いと思う。
そんなときはコンストラクタに引数を用意すれば良い。
親クラスのコンストラクタは必ず呼ぶこと。
というより、呼ばないと例外になる。
import pykka
class GreetingActor(pykka.ThreadingActor):
def __init__(self, greeting='Hello'):
super().__init__()
self.greeting = greeting
def on_receive(self, message):
name = message['name']
print('{0}, {1}!'.format(self.greeting, name))
def main():
actor_ref = GreetingActor.start('Hajimemashite')
actor_ref.tell({'name': 'Sekai'})
actor_ref.stop()
if __name__ == '__main__':
main()
コンストラクタに渡す引数はアクターの起動時に渡す。
上記のサンプルコードを実行した結果は次の通り。
$ python initpykka.py
Hajimemashite, Sekai!
コンストラクタで渡したメッセージが使われている。
返り値のあるメッセージパッシング
これまでの例では、アクターにメッセージを送っておしまいという、一方的なものだった。
アクターに対して一方的にメッセージを送りつけるには tell()
メソッドを使う。
とはいえ、何かメッセージを渡して、その結果として返り値を戻してほしいというのはよくある。
そんなときはメッセージを送りつけるときに ask()
メソッドを使う。
また、メッセージハンドラでは返り値を戻すように作る。
返り値を受け取る側では、いくつか受け取り方を選ぶことができる。
次のサンプルコードでは、戻り値のあるパターンを実装している。
そして、返り値の受け取り側でもいくつかのやり方を試している。
import pykka
class GreetingActor(pykka.ThreadingActor):
def on_receive(self, message):
name = message['name']
return 'Hello, {}!'.format(name)
def main():
actor_ref = GreetingActor.start()
answer = actor_ref.ask({'name': 'World'})
print(answer)
answer = actor_ref.ask({'name': 'Timeout'}, timeout=1)
print(answer)
future = actor_ref.ask({'name': 'Future'}, block=False)
answer = future.get()
print(answer)
future = actor_ref.ask({'name': 'Future-Timeout'}, block=False)
answer = future.get(timeout=1)
print(answer)
actor_ref.stop()
if __name__ == '__main__':
main()
上記で返り値を受け取るやり方がいくつかあることについて補足しておく。
前述した通り、アクターの内外は非同期の壁で断絶されている。
つまり、メッセージに対する返事が返ってくるタイミングは、将来であることは分かるものの具体的にいつかは分からないのだ。
そのため、返事の受け取り方として「返ってくるまですぐにブロックする」か「とりあえず受理された旨の返事だけ受け取る」やり方を選べるわけだ。
後者では、いわゆる Future を受け取っている。
先ほどのサンプルコードを実行した結果は次の通り。
$ python askpykka.py
Hello, World!
Hello, Timeout!
Hello, Future!
Hello, Future-Timeout!
アクターの中で発生した例外を処理する
もし、アクターの返り値のある処理で例外が発生したことに気づくことができないと大変だ。
Pykka では ActorRef#ask()
メソッド経由で呼び出された処理の中で例外が起こったときは、呼び出し元にそれが伝搬するようになっている。
次のサンプルコードではメッセージハンドラの中で意図的に例外を上げている。
import pykka
class ExceptionActor(pykka.ThreadingActor):
def on_receive(self, message):
raise Exception('Oops!')
def main():
actor_ref = ExceptionActor.start()
try:
actor_ref.ask({'name': 'World'})
except Exception as e:
print(e)
actor_ref.stop()
if __name__ == '__main__':
main()
上記のサンプルコードを実行した結果は次の通り。
$ python trypykka.py
Oops!
親スレッドが死んだときにアクターのスレッドも停止させたい
ちなみに、先ほどのサンプルコードで例外をキャッチしないでいるとどうなるか試してみただろうか。
実はアプリケーションの実行はそのまま継続することになる。
どうしてそんなことが起こるかというと、子スレッドがデーモンスレッドになっていないためだ。
アプリケーションは全てのスレッドが停止するまで処理を継続することになる。
もし、親スレッドが停止したときに子スレッドも停止させたい、つまり一家心中を図りたいときは次のようにする。
アクターには use_daemon_thread
というアトリビュートが用意されているので、そのフラグを True
にしよう。
#!/usr/bin/env python
import pykka
class ExceptionActor(pykka.ThreadingActor):
use_daemon_thread = True
def on_receive(self, message):
raise Exception('Oops!')
def main():
actor_ref = ExceptionActor.start()
actor_ref.ask({'name': 'World'})
actor_ref.stop()
if __name__ == '__main__':
main()
こうすれば、例外を取りこぼしたことでメインスレッドが停止したときに、そこから起動されたアクターのスレッドも同時に停止する。
$ python daemonpykka.py
Traceback (most recent call last):
File "daemonpykka.py", line 27, in <module>
main()
File "daemonpykka.py", line 20, in main
actor_ref.ask({'name': 'World'})
File "/Users/amedama/.virtualenvs/py36/lib/python3.6/site-packages/pykka/actor.py", line 435, in ask
return future.get(timeout=timeout)
File "/Users/amedama/.virtualenvs/py36/lib/python3.6/site-packages/pykka/threading.py", line 52, in get
compat.reraise(*self._data['exc_info'])
File "/Users/amedama/.virtualenvs/py36/lib/python3.6/site-packages/pykka/compat.py", line 24, in reraise
raise value
File "/Users/amedama/.virtualenvs/py36/lib/python3.6/site-packages/pykka/actor.py", line 201, in _actor_loop
response = self._handle_receive(message)
File "/Users/amedama/.virtualenvs/py36/lib/python3.6/site-packages/pykka/actor.py", line 304, in _handle_receive
return self.on_receive(message)
File "daemonpykka.py", line 13, in on_receive
raise Exception('Oops!')
Exception: Oops!
アクターのフラグを切り替えて、挙動の違いを確認してみよう。
アクタープロキシを使って処理を呼び出す
これまでの例では pykka.ThreadingActor
の持つ on_receive()
というメソッドをオーバーライドすることでアクターを実装してきた。
しかし、このやり方はだいぶプリミティブで Pykka の低レイヤーな API を使っている。
実際には、アクタープロキシという機構を使って、アクターのメソッドをあたかも直接呼び出しているような風に処理できる。
次のサンプルコードではアクターで独自のメソッド greeting
を定義している。
その呼び出し側では ActorRef
から ActorProxy
を生成して、あたかもメソッドを直接呼び出しているように扱っている。
import pykka
class GreetingActor(pykka.ThreadingActor):
def greeting(self):
print('Hello, World!')
def main():
actor_ref = GreetingActor.start()
actor_proxy = actor_ref.proxy()
actor_proxy.greeting()
actor_ref.stop()
if __name__ == '__main__':
main()
上記のサンプルコードを実行した結果は次の通り。
$ python proxypykka.py
Hello, World!
アクタープロキシを使って返り値のある場合
先ほどのサンプルコードでは、アクタープロキシを使って返り値のない場合を扱っていた。
今度は返り値のあるときを試してみる。
アクタープロキシを使って返り値を受け取るときは、実は必ず Future を介すことになる。
次のサンプルコードではアクタープロキシを使った上で返り値を受け取っている。
import pykka
class GreetingActor(pykka.ThreadingActor):
def greeting(self):
return 'Hello, World!'
def main():
actor_ref = GreetingActor.start()
actor_proxy = actor_ref.proxy()
future = actor_proxy.greeting()
answer = future.get()
print(answer)
actor_ref.stop()
if __name__ == '__main__':
main()
アクタープロキシ経由で実行したメソッドは、必ず Future 経由で返り値を受け取ることになる。
上記の実行結果は次の通り。
$ python futurepykka.py
Hello, World!
アクターのアトリビュートにアクタープロキシ経由でアクセスする
前述した通り、アクターの中と外は非同期の壁で断絶されていることから、直接同期的に触ることはできない。
それをすると、アクターの処理が競合する恐れがあるためだ。
では、アクターのアトリビュートを触るのにも、必ずアクセサに相当するメソッドを定義する必要があるのだろうか?
だとすると相当に面倒くさいと思うんだけど、これは定義する必要がない。
アクタープロキシ経由であればアトリビュートも Future 経由にはなるが、触ることができる。
次のサンプルコードではアクターに message
というアトリビュートを定義している。
メインスレッドからは、それをアクタープロキシ経由で読み出している。
import pykka
class MyActor(pykka.ThreadingActor):
def __init__(self):
super().__init__()
self.message = 'Hello, World!'
def main():
actor_ref = MyActor.start()
actor_proxy = actor_ref.proxy()
future = actor_proxy.message
answer = future.get()
print(answer)
actor_ref.stop()
if __name__ == '__main__':
main()
ポイントとしては、アトリビュートへのアクセスであっても Future が返る点だ。
こうすればアクセスが非同期に処理されるため、処理が競合することはない。
上記の処理結果は次の通り。
$ python attrpykka.py
Hello, World!
アクターのメンバが持つメソッドをアクタープロキシ経由で呼び出す
先ほどのサンプルコードではアクターのメンバがただの文字列だった。
次は、これがメソッドを持ったインスタンスだったときの話。
次のサンプルコードでは、アクター MyActor
がアクターでないアトリビュート NonActor
のインスタンスをメンバに持っている。
それを、アクタープロキシ経由で外から呼び出している。
この場合、メンバとなるアクターでないオブジェクトには pykka_traversable
というフラグが必要になる。
このフラグを立てることで、アクタープロキシから呼び出すことができるようになる。
import pykka
class NonActor(object):
"""アクターのアトリビュートになる非アクタークラス"""
pykka_traversable = True
def greeting(self):
"""プロキシ経由で外部から呼びたいメソッド"""
return 'Hello, World!'
class MyActor(pykka.ThreadingActor):
def __init__(self):
super().__init__()
self.non_actor = NonActor()
def main():
actor_ref = MyActor.start()
actor_proxy = actor_ref.proxy()
future = actor_proxy.non_actor.greeting()
answer = future.get()
print(answer)
actor_ref.stop()
if __name__ == '__main__':
main()
このパターンでも返り値は Future になる。
上記のサンプルコードの実行結果は次の通り。
$ python methodpykka.py
Hello, World!
アクターモデルを使った並行処理
これまでのサンプルコードでは、アクターは定義しているものの平行処理をしていなかった。
なので、説明の都合上、仕方なかったとはいえ一体何のためにこんなことを、という印象だったかもしれない。
ここからは、ようやくアクターモデルを使って並行処理をしてみる。
アクターモデルの並行処理では、基本的にアクターをたくさん用意してタスクをそれぞれのアクターに振り分けていく。
アクターには、それぞれメッセージを処理するスレッドが起動しているので、処理をバラバラに並列で進めることができるわけ。
次のサンプルコードでは、 アクター MyActor
に擬似的に処理に時間のかかるメソッド process()
を用意している。
時間がかかる、というところはスレッドをスリープすることで表現した。
ここでは、時間のかかる処理を 4 回呼び出している。
それぞれの処理には 1 秒かかる。
一つのアクターで直列に処理すれば、単純に 4 秒かかるはずの処理を、ここでは二つのアクターに振り分けている。
from __future__ import division
import time
import pykka
class MyActor(pykka.ThreadingActor):
def process(self, processing_id, sleep_sec):
"""時間がかかる処理を模したメソッド"""
time.sleep(sleep_sec)
print('Completed: ID {0} in {1} s'.format(processing_id, sleep_sec))
def main():
actor_proxy_a = MyActor.start().proxy()
actor_proxy_b = MyActor.start().proxy()
start_time = time.time()
future1 = actor_proxy_a.process(1, 1)
future2 = actor_proxy_a.process(2, 1)
future3 = actor_proxy_b.process(3, 1)
future4 = actor_proxy_b.process(4, 1)
future1.get()
future2.get()
future3.get()
future4.get()
end_time = time.time()
elapsed_time = end_time - start_time
print('Elapsed Time: {0} s'.format(elapsed_time))
pykka.ActorRegistry.stop_all()
if __name__ == '__main__':
main()
さて、処理が終わるのに実際は何秒かかるだろうか?
上記のサンプルコードの実行結果は次の通り。
二つのアクターで処理した結果として本来一つのアクターなら 4 秒かかるところ 2 秒で完了している。
$ python cuncurrentpykka.py
Completed: ID 3 in 1 s
Completed: ID 1 in 1 s
Completed: ID 2 in 1 s
Completed: ID 4 in 1 s
Elapsed Time: 2.0071158409118652 s
アクターモデルでも設計によっては競合が起こる
アクターモデルを使えば、競合が起きないというような説明をされることがたまにあるようだ。
しかし、その「競合が起きない」というのは、より低レイヤーな側面からアクターモデルを見たときの話だと思う。
あくまで、アクターのメッセージキューを処理するスレッドが一つだけなので、アクターの中で処理が競合しないというのに過ぎない。
より高レイヤーに、アプリケーションという側面からアクターモデルを見たとき、競合は容易に起こる。
それぞれの競合は全く別の次元の話なんだけど、もし混ぜて考えてしまうと危険だ。
次は、その競合について見てみよう。
サンプルコードでは、銀行口座を模したモデルを用意した。
AccountActor
という銀行口座を模したアクターを UserActor
というユーザを模したアクターが操作する。
UserActor
はまず銀行口座から預金残高を読み取って、その値を元に預金残高を更新する。
ここでポイントなのは AccountActor
の API がアトミックになっていないという点だ。
つまり、銀行口座の読み取りから更新までの間に別のアクターからの処理が割り込む余地がある。
サンプルコードでは、口座の読み取りから更新までの間に意図的にスリープを入れることで競合が起こりやすいようにしている。
import time
import pykka
class AccountActor(pykka.ThreadingActor):
"""銀行口座を模したアクター"""
def __init__(self, initial_money=0):
super(AccountActor, self).__init__()
self._money = initial_money
def get_balance(self):
return self._money
def set_balance(self, money):
self._money = money
class UserActor(pykka.ThreadingActor):
"""銀行口座を使うユーザを模したアクター"""
def __init__(self, username, account_actor):
super(UserActor, self).__init__()
self._username = username
self._account_actor = account_actor
def save(self, amount):
future = self._account_actor.get_balance()
balance = future.get()
print('{0} read balance: {1}'.format(self._username, balance))
time.sleep(1)
print('{0} sleep...'.format(self._username))
new_balance = balance + amount
future = self._account_actor.set_balance(new_balance)
future.get()
print('{0} set balance: {1}'.format(self._username, new_balance))
def main():
account_actor_ref = AccountActor.start(100)
account_actor_proxy = account_actor_ref.proxy()
user_a_ref = UserActor.start('UserA', account_actor_proxy)
user_a_proxy = user_a_ref.proxy()
user_b_ref = UserActor.start('UserB', account_actor_proxy)
user_b_proxy = user_b_ref.proxy()
future_a = user_a_proxy.save(50)
future_b = user_b_proxy.save(50)
future_a.get()
future_b.get()
print('Final Balance: {0}'.format(account_actor_proxy.get_balance().get()))
pykka.ActorRegistry.stop_all()
if __name__ == '__main__':
main()
二つのユーザを模したアクターは銀行残高をそれぞれ 50
だけ増やすべく処理を実施する。
初期の預金残高は 100
なので、最終的には 200
になっていないとおかしい。
では、上記のサンプルコードを実行してみよう。
$ python conflictpykka.py
UserA read balance: 100
UserB read balance: 100
UserA sleep...
UserB sleep...
UserA set balance: 150
UserB set balance: 150
Final Balance: 150
最終的な口座残高は 150
となってしまった。
見事に競合が発生している。
問題は、アクターの API が更新に対してアトミックになっていなかった点だ。
次の改良したサンプルコードでは、預金残高を更新するメソッドを update_balance()
という形でアトミックにしている。
現在の預金残高を元に、口座のアクター自体が更新をかけるので、間に別のアクターの処理が入り込む余地はない。
アクターの中はメッセージキューを処理する一つのスレッドで処理されているので、同時に同じメソッドが走る心配はない。
import pykka
class AccountActor(pykka.ThreadingActor):
def __init__(self, initial_money=0):
super(AccountActor, self).__init__()
self._money = initial_money
def get_balance(self):
return self._money
def update_balance(self, money):
self._money = self._money + money
class UserActor(pykka.ThreadingActor):
def __init__(self, username, account_actor):
super(UserActor, self).__init__()
self._username = username
self._account_actor = account_actor
def save(self, amount):
future = self._account_actor.update_balance(amount)
future.get()
print('{0} update balance: {1}'.format(self._username, amount))
def main():
account_actor_ref = AccountActor.start(100)
account_actor_proxy = account_actor_ref.proxy()
user_a_ref = UserActor.start('UserA', account_actor_proxy)
user_a_proxy = user_a_ref.proxy()
user_b_ref = UserActor.start('UserB', account_actor_proxy)
user_b_proxy = user_b_ref.proxy()
future_a = user_a_proxy.save(50)
future_b = user_b_proxy.save(50)
future_a.get()
future_b.get()
print('Final Balance: {0}'.format(account_actor_proxy.get_balance().get()))
pykka.ActorRegistry.stop_all()
if __name__ == '__main__':
main()
上記のサンプルコードを実行した結果は次の通り。
今度はちゃんと預金残高が 200
に更新されている。
$ python conflictpykka2.py
UserA update balance: 50
UserB update balance: 50
Final Balance: 200
アクターモデルでも設計によってはデッドロックが起こる
典型的なデッドロックは、マルチスレッドでリソースをロックする順番が不定になっていたときに起こる。
アクターモデルはロックを使わないので、それならデッドロックが起こらないかというと、そんなことはない。
次のサンプルコードではデッドロックを発生させている。
ActorA
は ActorB
のメソッドを呼んで、ActorB
は ActorA
のメソッドを呼ぶ、というように操作が交差している。
それぞれのメソッドは各アクターのスレッドで非同期に処理されるので、返り値で得られる Future を get()
すると完了するまでブロックする。
すなわち、双方がお互いの処理の完了を待ってデッドロックを起こしてしまう。
import logging
import signal
import os
import pykka
from pykka import debug
class ActorA(pykka.ThreadingActor):
def method_a(self, actor_b_proxy):
future = actor_b_proxy.method_b()
future.get()
class ActorB(pykka.ThreadingActor):
def __init__(self, actor_a_proxy):
super(ActorB, self).__init__()
self.actor_a_proxy = actor_a_proxy
def method_b(self):
future = self.actor_a_proxy.method_a()
future.get()
XXX
def main():
"""意図的にデッドロックを起こしてみる"""
logging.basicConfig(level=logging.DEBUG)
signal.signal(signal.SIGUSR1, debug.log_thread_tracebacks)
actor_a_ref = ActorA.start()
actor_a_proxy = actor_a_ref.proxy()
actor_b_ref = ActorB.start(actor_a_proxy)
actor_b_proxy = actor_b_ref.proxy()
actor_a_proxy.method_a(actor_b_proxy)
pid = os.getpid()
print(f'PID: {pid}')
if __name__ == '__main__':
main()
上記を実行すると、何も表示されずに処理が止まる。
$ python deadlockpykka.py
DEBUG:pykka:Registered ActorA (urn:uuid:5b470837-65b9-408d-9843-23982f2efcc5)
DEBUG:pykka:Starting ActorA (urn:uuid:5b470837-65b9-408d-9843-23982f2efcc5)
DEBUG:pykka:Registered ActorB (urn:uuid:02ab87fd-705b-4ec4-9718-78870b99eb90)
DEBUG:pykka:Starting ActorB (urn:uuid:02ab87fd-705b-4ec4-9718-78870b99eb90)
PID: 9199
サンプルコードでプロセス ID を表示するようにしているところに気づいただろうか?
また Pykka の debug.log_thread_tracebacks
という関数がシグナル SIGUSR1
に対して登録されている。
そこで、別のターミナルからこのハンドラに対してシグナルを送ってみよう。
$ kill -SIGUSR1 9199
すると、今のスレッドが何処を処理しているのかトレースバックが表示される。
この表示からは、二つのアクターのスレッドが future.get()
でブロックしていることが分かる。
$ python deadlockpykka.py
DEBUG:pykka:Registered ActorA (urn:uuid:5b470837-65b9-408d-9843-23982f2efcc5)
DEBUG:pykka:Starting ActorA (urn:uuid:5b470837-65b9-408d-9843-23982f2efcc5)
DEBUG:pykka:Registered ActorB (urn:uuid:02ab87fd-705b-4ec4-9718-78870b99eb90)
DEBUG:pykka:Starting ActorB (urn:uuid:02ab87fd-705b-4ec4-9718-78870b99eb90)
PID: 9199
CRITICAL:pykka:Current state of ActorB-2 (ident: 123145451573248):
File "/Users/amedama/.pyenv/versions/3.6.0/lib/python3.6/threading.py", line 884, in _bootstrap
self._bootstrap_inner()
File "/Users/amedama/.pyenv/versions/3.6.0/lib/python3.6/threading.py", line 916, in _bootstrap_inner
self.run()
File "/Users/amedama/.pyenv/versions/3.6.0/lib/python3.6/threading.py", line 864, in run
self._target(*self._args, **self._kwargs)
File "/Users/amedama/.virtualenvs/py36/lib/python3.6/site-packages/pykka/actor.py", line 201, in _actor_loop
response = self._handle_receive(message)
File "/Users/amedama/.virtualenvs/py36/lib/python3.6/site-packages/pykka/actor.py", line 295, in _handle_receive
return callee(*message['args'], **message['kwargs'])
File "deadlockpykka.py", line 31, in method_b
future.get()
File "/Users/amedama/.virtualenvs/py36/lib/python3.6/site-packages/pykka/threading.py", line 50, in get
self._data = self._queue.get(True, timeout)
File "/Users/amedama/.pyenv/versions/3.6.0/lib/python3.6/queue.py", line 164, in get
self.not_empty.wait()
File "/Users/amedama/.pyenv/versions/3.6.0/lib/python3.6/threading.py", line 295, in wait
waiter.acquire()
CRITICAL:pykka:Current state of ActorA-1 (ident: 123145446318080):
File "/Users/amedama/.pyenv/versions/3.6.0/lib/python3.6/threading.py", line 884, in _bootstrap
self._bootstrap_inner()
File "/Users/amedama/.pyenv/versions/3.6.0/lib/python3.6/threading.py", line 916, in _bootstrap_inner
self.run()
File "/Users/amedama/.pyenv/versions/3.6.0/lib/python3.6/threading.py", line 864, in run
self._target(*self._args, **self._kwargs)
File "/Users/amedama/.virtualenvs/py36/lib/python3.6/site-packages/pykka/actor.py", line 201, in _actor_loop
response = self._handle_receive(message)
File "/Users/amedama/.virtualenvs/py36/lib/python3.6/site-packages/pykka/actor.py", line 295, in _handle_receive
return callee(*message['args'], **message['kwargs'])
File "deadlockpykka.py", line 18, in method_a
future.get()
File "/Users/amedama/.virtualenvs/py36/lib/python3.6/site-packages/pykka/threading.py", line 50, in get
self._data = self._queue.get(True, timeout)
File "/Users/amedama/.pyenv/versions/3.6.0/lib/python3.6/queue.py", line 164, in get
self.not_empty.wait()
File "/Users/amedama/.pyenv/versions/3.6.0/lib/python3.6/threading.py", line 295, in wait
waiter.acquire()
CRITICAL:pykka:Current state of MainThread (ident: 140736596534208):
File "/Users/amedama/.pyenv/versions/3.6.0/lib/python3.6/threading.py", line 1290, in _shutdown
t.join()
File "/Users/amedama/.pyenv/versions/3.6.0/lib/python3.6/threading.py", line 1056, in join
self._wait_for_tstate_lock()
File "/Users/amedama/.pyenv/versions/3.6.0/lib/python3.6/threading.py", line 1072, in _wait_for_tstate_lock
elif lock.acquire(block, timeout):
File "/Users/amedama/.virtualenvs/py36/lib/python3.6/site-packages/pykka/debug.py", line 68, in log_thread_tracebacks
stack = ''.join(traceback.format_stack(frame))
Pykka の並行処理に関しては、上記のようにしてデバッグができる。
スレッドプールっぽいものを作って並行処理をしてみる
次はもうちょっと実践的に、スレッドプールっぽく並行処理をしてみよう。
とはいえ、あまり汎用的には作らずベタ書きだけど。
次のサンプルコードでは、先ほどの例と同じように MyActor
に処理に時間のかかるメソッド process()
を用意している。
アクターを配列で 10 個生成した上で、タスクを 20 回呼び出してみよう。
ただし、通常のスレッドプールのような形で終わったものから新しいタスクを割り当てる、というものではない。
あらかじめ、各アクターに決まった番号のタスクを割り当てるような形になっている。
from __future__ import division
import time
import random
import pykka
class MyActor(pykka.ThreadingActor):
def process(self, processing_id):
"""時間がかかる処理を模したメソッド"""
sleep_sec = random.randint(1, 10) / 10
time.sleep(sleep_sec)
print('Completed: ID {0} in {1} s'.format(processing_id, sleep_sec))
return processing_id
def main():
actor_proxy_pool = [MyActor.start().proxy() for _ in range(10)]
futures = [
actor_proxy_pool[i % len(actor_proxy_pool)].process(i)
for i in range(20)
]
answers = pykka.get_all(futures)
print('*** Calculation Completed ***')
for answer in answers:
print('ID: {0}'.format(answer))
pykka.ActorRegistry.stop_all()
if __name__ == '__main__':
main()
実行結果は次の通り。
各タスクが順番バラバラで実行されているのが分かる。
$ python tpoolpykka.py
Completed: ID 3 in 0.1 s
Completed: ID 0 in 0.3 s
Completed: ID 7 in 0.3 s
Completed: ID 13 in 0.2 s
Completed: ID 1 in 0.5 s
Completed: ID 4 in 0.5 s
Completed: ID 5 in 0.6 s
Completed: ID 2 in 0.7 s
Completed: ID 8 in 0.7 s
Completed: ID 10 in 0.4 s
Completed: ID 17 in 0.4 s
Completed: ID 6 in 0.8 s
Completed: ID 9 in 1.0 s
Completed: ID 15 in 0.5 s
Completed: ID 11 in 0.6 s
Completed: ID 12 in 0.5 s
Completed: ID 16 in 0.5 s
Completed: ID 14 in 0.9 s
Completed: ID 19 in 0.5 s
Completed: ID 18 in 0.9 s
*** Calculation Completed ***
ID: 0
ID: 1
ID: 2
ID: 3
ID: 4
ID: 5
ID: 6
ID: 7
ID: 8
ID: 9
ID: 10
ID: 11
ID: 12
ID: 13
ID: 14
ID: 15
ID: 16
ID: 17
ID: 18
ID: 19
出力の最後を見て分かる通り、実行結果として得られる配列の順番は変わっていない。
これは、単純に内部でやっているのが順番に Future#get()
をしていく、という処理だからだ。
とはいえ、最終的に得られる結果の順番が入れ替わらないという特性は並行処理をする上で優れた特徴だと思う。
場合によっては終わったものから返ってきて、後からソートし直さないといけないことなんかもあるし。
ラウンドロビンで処理するメッセージルータを実装してみる
次は、お試しで作ってみたメッセージルータについて。
これはほんとに試しに作ってみただけなのでプルーフオブコンセプトと思ってほしい。
次のサンプルコードでは MyActor
をラップするメッセージルータの RoundRobinRouter
を定義している。
RoundRobinRouter
では、内部的に MyActor
のプロキシを複数生成して、処理を振り分ける。
先ほどのべた書きのスレッドプールを、もうちょっと洗練させた感じ。
RoundRobinRouter
は、まず処理をフォワードしたいアクターと同時並行数 (アクターをいくつ生成するか) を指定して起動する。
その上で RoundRobinRouter#forward()
メソッドに呼び出したいメソッド名と引数を指定するだけ。
注意点としては、返り値が二重の Future になっているところ。
from __future__ import division
import time
import random
import pykka
class MyActor(pykka.ThreadingActor):
def process(self, processing_id):
"""時間がかかる処理を模したメソッド"""
sleep_sec = random.randint(1, 10) / 10
time.sleep(sleep_sec)
print('Completed: ID {0} in {1}s by {2}'.format(
processing_id,
sleep_sec,
self.actor_urn,
))
return processing_id
class RoundRobinRouter(pykka.ThreadingActor):
"""ラップしたアクターにラウンドロビンで処理を割り当てていくメッセージルータ"""
def __init__(self, actor_type, size):
super(RoundRobinRouter, self).__init__()
self._proxy_pool = [actor_type.start().proxy()
for _ in range(size)]
self._index_gen = self._next_actor_index()
def forward(self, name, *args, **kwargs):
actor_index = next(self._index_gen)
actor_proxy = self._proxy_pool[actor_index]
attribute = getattr(actor_proxy, name)
return attribute(*args, **kwargs)
def _next_actor_index(self):
"""次の処理対象のアクターのインデックスを生成するジェネレータ"""
i = 0
while True:
yield i
i = (i + 1) % len(self._proxy_pool)
def main():
router_ref = RoundRobinRouter.start(MyActor, 5)
router_proxy = router_ref.proxy()
futures = [router_proxy.forward('process', i)
for i in range(10)]
inner_futures = pykka.get_all(futures)
answers = pykka.get_all(inner_futures)
print('*** Calculation Completed ***')
for answer in answers:
print('ID: {0}'.format(answer))
pykka.ActorRegistry.stop_all()
if __name__ == '__main__':
main()
上記の実行結果は次の通り。
タスクを処理したアクターの識別子も同時に出力するようになっている。
見ると、タスクの識別子が 5 つの周期で同じアクターによって処理されていることが分かる。
上手く動作しているようだ。
$ python routerpykka.py
Completed: ID 1 in 0.3s by urn:uuid:a5d3f2e6-6d59-4351-9be8-e47023428e99
Completed: ID 2 in 0.6s by urn:uuid:ee6f93cf-66df-4500-b416-84defff8b195
Completed: ID 3 in 0.6s by urn:uuid:c6003463-87fc-4988-b8bb-34186e867630
Completed: ID 0 in 0.7s by urn:uuid:9a2fe8bc-49fe-4b19-96d5-6658ecae3677
Completed: ID 7 in 0.1s by urn:uuid:ee6f93cf-66df-4500-b416-84defff8b195
Completed: ID 4 in 0.8s by urn:uuid:0e3b5668-6825-43f5-bd95-9cf2c706a03d
Completed: ID 6 in 0.5s by urn:uuid:a5d3f2e6-6d59-4351-9be8-e47023428e99
Completed: ID 9 in 0.3s by urn:uuid:0e3b5668-6825-43f5-bd95-9cf2c706a03d
Completed: ID 5 in 0.8s by urn:uuid:9a2fe8bc-49fe-4b19-96d5-6658ecae3677
Completed: ID 8 in 1.0s by urn:uuid:c6003463-87fc-4988-b8bb-34186e867630
*** Calculation Completed ***
ID: 0
ID: 1
ID: 2
ID: 3
ID: 4
ID: 5
ID: 6
ID: 7
ID: 8
ID: 9
まとめ
今回は Python でアクターモデルを実装するためのフレームワークである Pykka を試してみた。
アクターモデルというのは、並行処理のプログラミングモデルの一つ。
ただし、マルチスレッドやマルチプロセスよりも、抽象度の高い概念になっている。
対比するとしたら共有メモリやロック、RPC などがそれに当たると思う。
アクターモデルの概念を理解する上で重要なのは、アクターを構成する三つの要素を理解すること。
一つ目がメッセージキュー、二つ目がメッセージハンドラ、三つ目がそれらをディスパッチするスレッド。
生成されたアクターには、それぞれにこの三つの要素が必ず備わっている。
メッセージキューとスレッドはあらかじめアクターごとに一つずつ生成されるし、メッセージハンドラは自分で定義する。
アクターモデルでは、内部で競合を起こさないようにアクターの内外が非同期の壁で分断されている。
アクターの内部に、外側から同期的に直接触れることはできない。
必ず、触るとしたら Future を経由することになる。
同期的に触っているように見えたとしても、内部的には必ず経由している。
アクターモデルで並行処理を実現するには、アクターを横にたくさん並べてタスクを割り当てていく。
ただし、それぞれのタスクはアトミックに作られていないといけない。
アクターモデルで競合が起こらないというのは、あくまで「低レイヤーで内部的には」という但し書きがつく。
また、ロックがないからといって、デッドロックが起こらないというわけでもない。
Pykka の場合、Akka の機能を全て揃えているわけではないので、割りとよく使いそうな機能でも自分で書く必要がある。
また、正直アプリケーションで直接 Pykka を使うというシチュエーションは、あまり無いのかもしれないと思った。
なぜなら Pykka の提供する API は、直接使うにはあまりに低レイヤーすぎるからだ。
なので、Pykka をベースにした何らかのフレームワークをまず書いて、その上でアプリケーションを組むことになりそうな印象を受けた。
いじょう。