Python の concurrent.futures はバージョン 3.2 で追加された並行・並列処理用のパッケージ。 似たようなパッケージにはこれまでにも threading や multiprocessing があったんだけど、これはそれよりも高レベルの API になっている。 デフォルトでスレッド・プロセスプールが使えたり、マルチスレッドとマルチプロセスがほとんどコードを変えずに使い分けられるメリットがある。
下準備
使う Python のバージョンが 3.2 未満のときは PyPI にあるバックポート版のパッケージをインストールする必要がある。
$ pip install futures
ただし、今回使う環境は Python 3.5 なので関係ない。
$ python --version Python 3.5.1 $ sw_vers ProductName: Mac OS X ProductVersion: 10.11.3 BuildVersion: 15D21
シングルスレッドの場合
今回は扱うものが並行・並列処理なので、計算量がたくさん必要になるものを題材とする。 試しに、いくつかの大きな数を素数か判定するプログラムをシングルスレッドで書いてみよう。 アルゴリズムとしては単純な試し割り法を使った。
#!/usr/bin/env python # -*- coding: utf-8 -*- import math def _is_prime(n): """数値が素数か判定する関数""" if n < 2: return False if n == 2: return True if n % 2 == 0: return False sqrt_n = int(math.floor(math.sqrt(n))) for i in range(3, sqrt_n + 1, 2): if n % i == 0: return False return True def _calc(): # 調査対象の数値 values = [ 112272535095293, 112582705942171, 112272535095293, 115280095190773, 115797848077099, 1099726899285419, ] for target in values: # 素数かチェックして結果を表示する result = _is_prime(target) msg = '{n}: {prime}'.format(n=target, prime=result) print(msg) def main(): import time start = time.time() _calc() end = time.time() # 計算にかかった時間を表示する elapsed_time = end - start msg = 'Time: {elapsed_time} sec'.format(elapsed_time=elapsed_time) print(msg) if __name__ == '__main__': main()
これを実行してみると、それぞれの数をリストに収まっている順番で判定していくことになる。
$ python singlethread.py 112272535095293: True 112582705942171: True 112272535095293: True 115280095190773: True 115797848077099: True 1099726899285419: False Time: 3.3549559116363525 sec
シングルスレッド版は 3.35 秒かかった。
マルチスレッドの場合
次は先ほどのサンプルをマルチスレッドにしてみよう。 ソースコードの細かい説明はコメントで行っている。
#!/usr/bin/env python # -*- coding: utf-8 -*- import math from concurrent import futures def _is_prime(n): """数値が素数か判定する関数""" if n < 2: return False if n == 2: return True if n % 2 == 0: return False sqrt_n = int(math.floor(math.sqrt(n))) for i in range(3, sqrt_n + 1, 2): if n % i == 0: return False return True def _calc(): # 調査対象の数値 values = [ 112272535095293, 112582705942171, 112272535095293, 115280095190773, 115797848077099, 1099726899285419, ] with futures.ThreadPoolExecutor() as executor: # 並行処理する対象とそれに対応する値 (処理の引数) を辞書で用意する mappings = {executor.submit(_is_prime, n): n for n in values} # futures.as_completed() は処理がおわったものから結果を返していくジェネレータ for future in futures.as_completed(mappings): # 完了した処理に対応する引数を辞書から取得する target = mappings[future] # 処理結果を取得する result = future.result() # 結果を表示する msg = '{n}: {prime}'.format(n=target, prime=result) print(msg) def main(): import time start = time.time() _calc() end = time.time() # 計算にかかった時間を表示する elapsed_time = end - start msg = 'Time: {elapsed_time} sec'.format(elapsed_time=elapsed_time) print(msg) if __name__ == '__main__': main()
次も同じように実行してみよう。
$ python multithread.py 1099726899285419: False 112582705942171: True 112272535095293: True 112272535095293: True 115280095190773: True 115797848077099: True Time: 3.3795218467712402 sec
先ほどとかかっている時間はほとんど変わっていない。 マルチスレッドにしたのに何故かと不思議に思われるかもしれない。 ただ、これにはちゃんとした原因がある。 CPython には GIL (Global Interpretor Lock) というものがあって、マルチスレッドにしても同時に実行できるスレッドはひとつだけだからだ。 むしろスレッドの生成コストの分だけ遅くなっても何ら不思議ではない。 とはいえ同時に実行できるスレッドがひとつなだけで、処理は「並行」に進められている。 その証拠に、先ほどとは数が出力されている順番が異なることに気づくことだろう。
マルチプロセスの場合
次は先ほどのサンプルをマルチプロセスにした場合。 なんと修正するところはたったひとつ。 先ほど使った ThreadPoolExecutor を ProcessPoolExecutor に変えるだけ。 これでマルチスレッドの実行がマルチプロセスでの実行に変わる。
#!/usr/bin/env python # -*- coding: utf-8 -*- import math from concurrent import futures def _is_prime(n): """数値が素数か判定する関数""" if n < 2: return False if n == 2: return True if n % 2 == 0: return False sqrt_n = int(math.floor(math.sqrt(n))) for i in range(3, sqrt_n + 1, 2): if n % i == 0: return False return True def _calc(): # 調査対象の数値 values = [ 112272535095293, 112582705942171, 112272535095293, 115280095190773, 115797848077099, 1099726899285419, ] with futures.ProcessPoolExecutor() as executor: # 並行処理する対象とそれに対応する値 (処理の引数) を辞書で用意する mappings = {executor.submit(_is_prime, n): n for n in values} # futures.as_completed() は処理がおわったものから結果を返していくジェネレータ for future in futures.as_completed(mappings): # 完了した処理に対応する引数を辞書から取得する target = mappings[future] # 処理結果を取得する result = future.result() # 結果を表示する msg = '{n}: {prime}'.format(n=target, prime=result) print(msg) def main(): import time start = time.time() _calc() end = time.time() # 計算にかかった時間を表示する elapsed_time = end - start msg = 'Time: {elapsed_time} sec'.format(elapsed_time=elapsed_time) print(msg) if __name__ == '__main__': main()
同じように実行してみよう。
$ python multiprocess.py 112272535095293: True 112582705942171: True 112272535095293: True 115280095190773: True 1099726899285419: False 115797848077099: True Time: 1.3142600059509277 sec
今度は複数のプロセスで「並列」に処理が進んだから実行時間が短縮されている。
まとめ
今回は Python の標準ライブラリ concurrent.futures を使った並行・並列処理について紹介した。 このライブラリを使うとマルチスレッドとマルチプロセスでほとんどソースコードを変えずに使い分けることができる。 ただし、マルチプロセスを使うときはひとつだけ注意が必要だ。 マルチプロセスの場合、プロセス間でのオブジェクトのやり取りは Pickle にもとづいて行われる。 そのため、並列化する関数の引数と返り値には Pickle 化できないオブジェクトを使うことができない。 Pickle についてはこのブログで以前に書いたことがあるのであわせて読んでもらいたい。
スマートPythonプログラミング: Pythonのより良い書き方を学ぶ
- 作者: もみじあめ
- 発売日: 2016/03/12
- メディア: Kindle版
- この商品を含むブログを見る