Python の concurrent.futures はバージョン 3.2 で追加された並行・並列処理用のパッケージ。
似たようなパッケージにはこれまでにも threading や multiprocessing があったんだけど、これはそれよりも高レベルの API になっている。
デフォルトでスレッド・プロセスプールが使えたり、マルチスレッドとマルチプロセスがほとんどコードを変えずに使い分けられるメリットがある。
下準備
使う Python のバージョンが 3.2 未満のときは PyPI にあるバックポート版のパッケージをインストールする必要がある。
$ pip install futures
ただし、今回使う環境は Python 3.5 なので関係ない。
$ python --version
Python 3.5.1
$ sw_vers
ProductName: Mac OS X
ProductVersion: 10.11.3
BuildVersion: 15D21
シングルスレッドの場合
今回は扱うものが並行・並列処理なので、計算量がたくさん必要になるものを題材とする。
試しに、いくつかの大きな数を素数か判定するプログラムをシングルスレッドで書いてみよう。
アルゴリズムとしては単純な試し割り法を使った。
import math
def _is_prime(n):
"""数値が素数か判定する関数"""
if n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
def _calc():
values = [
112272535095293,
112582705942171,
112272535095293,
115280095190773,
115797848077099,
1099726899285419,
]
for target in values:
result = _is_prime(target)
msg = '{n}: {prime}'.format(n=target, prime=result)
print(msg)
def main():
import time
start = time.time()
_calc()
end = time.time()
elapsed_time = end - start
msg = 'Time: {elapsed_time} sec'.format(elapsed_time=elapsed_time)
print(msg)
if __name__ == '__main__':
main()
これを実行してみると、それぞれの数をリストに収まっている順番で判定していくことになる。
$ python singlethread.py
112272535095293: True
112582705942171: True
112272535095293: True
115280095190773: True
115797848077099: True
1099726899285419: False
Time: 3.3549559116363525 sec
シングルスレッド版は 3.35 秒かかった。
マルチスレッドの場合
次は先ほどのサンプルをマルチスレッドにしてみよう。
ソースコードの細かい説明はコメントで行っている。
import math
from concurrent import futures
def _is_prime(n):
"""数値が素数か判定する関数"""
if n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
def _calc():
values = [
112272535095293,
112582705942171,
112272535095293,
115280095190773,
115797848077099,
1099726899285419,
]
with futures.ThreadPoolExecutor() as executor:
mappings = {executor.submit(_is_prime, n): n for n in values}
for future in futures.as_completed(mappings):
target = mappings[future]
result = future.result()
msg = '{n}: {prime}'.format(n=target, prime=result)
print(msg)
def main():
import time
start = time.time()
_calc()
end = time.time()
elapsed_time = end - start
msg = 'Time: {elapsed_time} sec'.format(elapsed_time=elapsed_time)
print(msg)
if __name__ == '__main__':
main()
次も同じように実行してみよう。
$ python multithread.py
1099726899285419: False
112582705942171: True
112272535095293: True
112272535095293: True
115280095190773: True
115797848077099: True
Time: 3.3795218467712402 sec
先ほどとかかっている時間はほとんど変わっていない。
マルチスレッドにしたのに何故かと不思議に思われるかもしれない。
ただ、これにはちゃんとした原因がある。
CPython には GIL (Global Interpretor Lock) というものがあって、マルチスレッドにしても同時に実行できるスレッドはひとつだけだからだ。
むしろスレッドの生成コストの分だけ遅くなっても何ら不思議ではない。
とはいえ同時に実行できるスレッドがひとつなだけで、処理は「並行」に進められている。
その証拠に、先ほどとは数が出力されている順番が異なることに気づくことだろう。
マルチプロセスの場合
次は先ほどのサンプルをマルチプロセスにした場合。
なんと修正するところはたったひとつ。
先ほど使った ThreadPoolExecutor を ProcessPoolExecutor に変えるだけ。
これでマルチスレッドの実行がマルチプロセスでの実行に変わる。
import math
from concurrent import futures
def _is_prime(n):
"""数値が素数か判定する関数"""
if n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
def _calc():
values = [
112272535095293,
112582705942171,
112272535095293,
115280095190773,
115797848077099,
1099726899285419,
]
with futures.ProcessPoolExecutor() as executor:
mappings = {executor.submit(_is_prime, n): n for n in values}
for future in futures.as_completed(mappings):
target = mappings[future]
result = future.result()
msg = '{n}: {prime}'.format(n=target, prime=result)
print(msg)
def main():
import time
start = time.time()
_calc()
end = time.time()
elapsed_time = end - start
msg = 'Time: {elapsed_time} sec'.format(elapsed_time=elapsed_time)
print(msg)
if __name__ == '__main__':
main()
同じように実行してみよう。
$ python multiprocess.py
112272535095293: True
112582705942171: True
112272535095293: True
115280095190773: True
1099726899285419: False
115797848077099: True
Time: 1.3142600059509277 sec
今度は複数のプロセスで「並列」に処理が進んだから実行時間が短縮されている。
まとめ
今回は Python の標準ライブラリ concurrent.futures を使った並行・並列処理について紹介した。
このライブラリを使うとマルチスレッドとマルチプロセスでほとんどソースコードを変えずに使い分けることができる。
ただし、マルチプロセスを使うときはひとつだけ注意が必要だ。
マルチプロセスの場合、プロセス間でのオブジェクトのやり取りは Pickle にもとづいて行われる。
そのため、並列化する関数の引数と返り値には Pickle 化できないオブジェクトを使うことができない。
Pickle についてはこのブログで以前に書いたことがあるのであわせて読んでもらいたい。
blog.amedama.jp